Table of contents
Open Table of contents
Background
Purpose:
automatically: kafka ---> kafka connect ---> PG (all in Protobuf format)
Preparation:
Download Confluent Platform first: If you download kafka alone, it doesn’t include kafka connect, you must download the confluent platform thing directly to include kafka connect.
And download kafka-connect-jdbc.
Then in the Sink Connector configuration of the properties file to modify the plug-in path, including this uncompressed file (in this document, for example, placed under /usr/share/java)
Note that in the configuration of postgres machine to be added to the pg_hba.conf:
host all all 172.31.116.134/32 md5
Check to see if pg is connected between the two machines (the one deploying kafka connect and the one deploying pg) itself:
psql -h 127.0.0.1 -p 5432 -U postgres -d postgres
Start zookeeper
cd confluent-7.5.2
bash bin/zookeeper-server-start etc/kafka/zookeeper.properties
Start Kafka Server
bash bin/kafka-server-start etc/kafka/server.properties
Create topic
bash bin/kafka-topics --bootstrap-server localhost:9092 --topic transactions-proto --create --partitions 3 --replication-factor 1
To delete this topic:
bash /home/admin/confluent-7.5.2/bin/kafka-topics --bootstrap-server localhost:9092 --delete --topic transactions-proto
Make sure topic is created:
bash bin/kafka-topics --bootstrap-server localhost:9092 --list
Create Kafka Schema Registry
bash bin/schema-registry-start etc/schema-registry/schema-registry.properties
Register Protobuf to Schema Registry
First we check if transactions-proto-value is present in the current Schema Registry:
curl -X GET localhost:8081/subjects/transactions-proto-value/versions
To delete:
curl -X DELETE localhost:8081/subjects/transactions-proto-value/versions/1
sample protobuf:
syntax = "proto3";
message MyRecord {
string id = 1;
float amount = 2;
int64 timestamp = 3;
}
register this Protobuf to Schema Registry using code:
import requests
def read_proto_file(file_path):
try:
with open(file_path, 'r') as file:
return file.read()
except IOError as e:
print(f"Error reading file {file_path}: {e}")
return None
proto_path = '/home/admin/my_record.proto'
schema_string = read_proto_file(proto_path)
if schema_string:
print(schema_string)
schema_subject= 'transactions-proto-value'
schema = {
"schemaType": "PROTUBUF",
"schema": schema_string
}
response = requests.post(
f'http://localhost:8081/subjects/{schema_subject}/versions',
json=schema
)
if response.status_code == 200:
print("schema registered siccessfully")
else:
print("Falied to register schema:", response.text)
Start Kafka Connect
bash bin/connect-standalone etc/schema-registry/connect-protobuf-standalone.properties etc/schema-registry/sink-quickstart-Postgres.properties
If you want to write to two PG tables at the same time:
bash
bin/connect-standalone etc/schema-registry/connect-protobuf-standalone.properties etc/schema-registry/sink-quickstart-Postgres.properties etc/schema-registry/second-sink.properties
sink-quickstart-Postgres.properties:
name=test-sink-pg
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=transactions-proto
connection.url=jdbc:postgresql://172.31.112.1:5444/postgres
table.name.format=transactions_proto_data
connection.user=postgres
connection.password=postgres
auto.create=true
insert.mode=upsert # This Provides Idempotence!!
pk.mode=record_value
pk.fields=id,dt2
auto.evolve=true
Check PB consumer
bash bin/kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --topic transactions-proto
PB producer
bash bin/kafka-protobuf-console-producer --bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 --topic transactions-proto \
--property value.schema='syntax = "proto3"; message MyRecord { string id = 1; float amount = 2; int64 timestamp = 3;}'
{"id":"1001","amount":5550 ,"timestamp":1701945761407673069}
If send using Python:
topic='transactions-proto'
schema_registry_conf={'url':'http://localhost:8081'}
schema_registry_client=SchemaRegistryClient(schema_registry_conf)
string_serializer=StringSerializer('utf8')
protobuf_serializer=ProtobufSerializer(my_record_pb2.MyRecord, schema_registry_client, {'use.deprecated.format': False})
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed:', err)
else:
print('Message delivered to', msg.topic(), msg.partition())
cnt=4001
while True:
if cnt > 9001:
break
my_record = my_record_pb2.MyRecord(id=str(cnt), amount=123.456, timestamp=int(time.time_ns()))
cnt+=1
producer.produce(topic=topic, partition=0, key=string_serializer(str(uuid4())), value=protobuf_serializer(my_record, SerializationContext(topic, MessageField.VALUE)), on_delivery=delivery_report)
producer.poll(0)
time.sleep(0.001)
check PG:
Penetration Delay Measurement
The average delay per message penetration is only 25ms! (producer send —> kafka —> kafka connect —> postgres —> read).
The IDs in this graph are not connected because the speed of while
(in my speed test script) sometimes does not catch up with the speed of insertion into the PG
operation. In 5000 insertions, there were only 652 times where while
caught up.
ByteArray Serializer Test
bash bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic transactions-proto --from-beginning
As you can see, for the message serialized with Protobuf’s serializer, something was added inside the value, compared to ByteArray’s serializer. This corresponds to what this documentation says: