Skip to content

Kafka Connect Configuration for PostgreSQL

Published: at 12:12 PM

Table of contents

Open Table of contents

Background

Purpose:

automatically: kafka ---> kafka connect ---> PG (all in Protobuf format)

Preparation:

Download Confluent Platform first: If you download kafka alone, it doesn’t include kafka connect, you must download the confluent platform thing directly to include kafka connect.

And download kafka-connect-jdbc.

Then in the Sink Connector configuration of the properties file to modify the plug-in path, including this uncompressed file (in this document, for example, placed under /usr/share/java)

Note that in the configuration of postgres machine to be added to the pg_hba.conf:

host all all 172.31.116.134/32 md5

Check to see if pg is connected between the two machines (the one deploying kafka connect and the one deploying pg) itself:

psql -h 127.0.0.1 -p 5432 -U postgres -d postgres

Start zookeeper

cd confluent-7.5.2
bash bin/zookeeper-server-start etc/kafka/zookeeper.properties

start-zookeeper

Start Kafka Server

bash bin/kafka-server-start etc/kafka/server.properties

start-kafka-server

Create topic

bash bin/kafka-topics --bootstrap-server localhost:9092 --topic transactions-proto --create --partitions 3 --replication-factor 1

create-topic

To delete this topic:

bash /home/admin/confluent-7.5.2/bin/kafka-topics --bootstrap-server localhost:9092 --delete --topic transactions-proto

Make sure topic is created:

bash bin/kafka-topics --bootstrap-server localhost:9092 --list

topic-created

Create Kafka Schema Registry

bash bin/schema-registry-start etc/schema-registry/schema-registry.properties

schema-registry

Register Protobuf to Schema Registry

First we check if transactions-proto-value is present in the current Schema Registry:

curl -X GET localhost:8081/subjects/transactions-proto-value/versions

To delete:

curl -X DELETE localhost:8081/subjects/transactions-proto-value/versions/1

sample protobuf:

syntax = "proto3";
message MyRecord {
  string id = 1;
  float amount = 2;
  int64 timestamp = 3;
}

register this Protobuf to Schema Registry using code:

import requests

def read_proto_file(file_path):
  try:
    with open(file_path, 'r') as file:
      return file.read()
  except IOError as e:
    print(f"Error reading file {file_path}: {e}")
    return None

proto_path = '/home/admin/my_record.proto'
schema_string = read_proto_file(proto_path)

if schema_string:
  print(schema_string)
schema_subject= 'transactions-proto-value'

schema = {
  "schemaType": "PROTUBUF",
  "schema": schema_string
}

response = requests.post(
  f'http://localhost:8081/subjects/{schema_subject}/versions',
  json=schema
)

if response.status_code == 200:
  print("schema registered siccessfully")
else:
  print("Falied to register schema:", response.text)

Start Kafka Connect

bash bin/connect-standalone etc/schema-registry/connect-protobuf-standalone.properties etc/schema-registry/sink-quickstart-Postgres.properties

If you want to write to two PG tables at the same time:

bash
bin/connect-standalone etc/schema-registry/connect-protobuf-standalone.properties etc/schema-registry/sink-quickstart-Postgres.properties etc/schema-registry/second-sink.properties

sink-quickstart-Postgres.properties:

name=test-sink-pg
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1

topics=transactions-proto

connection.url=jdbc:postgresql://172.31.112.1:5444/postgres
table.name.format=transactions_proto_data
connection.user=postgres
connection.password=postgres
auto.create=true
insert.mode=upsert # This Provides Idempotence!!
pk.mode=record_value
pk.fields=id,dt2
auto.evolve=true

start-kafka-connect

Check PB consumer

bash bin/kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --topic transactions-proto

pb-consumer

PB producer

bash bin/kafka-protobuf-console-producer --bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 --topic transactions-proto \
--property value.schema='syntax = "proto3"; message MyRecord { string id = 1; float amount = 2; int64 timestamp = 3;}'

{"id":"1001","amount":5550 ,"timestamp":1701945761407673069}

If send using Python:

topic='transactions-proto'
schema_registry_conf={'url':'http://localhost:8081'}
schema_registry_client=SchemaRegistryClient(schema_registry_conf)
string_serializer=StringSerializer('utf8')
protobuf_serializer=ProtobufSerializer(my_record_pb2.MyRecord, schema_registry_client, {'use.deprecated.format': False})

def delivery_report(err, msg):
  if err is not None:
    print('Message delivery failed:', err)
  else:
    print('Message delivered to', msg.topic(), msg.partition())

cnt=4001
while True:
  if cnt > 9001:
    break
  my_record = my_record_pb2.MyRecord(id=str(cnt), amount=123.456, timestamp=int(time.time_ns()))
  cnt+=1
  producer.produce(topic=topic, partition=0, key=string_serializer(str(uuid4())), value=protobuf_serializer(my_record, SerializationContext(topic, MessageField.VALUE)), on_delivery=delivery_report)

  producer.poll(0)
  time.sleep(0.001)

check PG:

pg-results

Penetration Delay Measurement

Penetration Delay

The average delay per message penetration is only 25ms! (producer send —> kafka —> kafka connect —> postgres —> read).

The IDs in this graph are not connected because the speed of while (in my speed test script) sometimes does not catch up with the speed of insertion into the PG operation. In 5000 insertions, there were only 652 times where while caught up.

ByteArray Serializer Test

bash bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic transactions-proto --from-beginning

pb-difference

As you can see, for the message serialized with Protobuf’s serializer, something was added inside the value, compared to ByteArray’s serializer. This corresponds to what this documentation says:

magic-byte

References