In the weekend, I try to use Python to write a producer and a consumer for Apache Kafka. I found Kafka-Python library that can help me do it easily. However, If you try to send Avro data from Producer to Consumer, it is not easy. You have to understand about them. We have enough specifications but there is no example source code. So this is a simple example to create a producer (producer.py) and a consumer (consumer.py) to stream Avro data via Kafka in Python.
The wise man never knows all, only fools know everything.
To run this source code, please make sure that you installed Kafka (https://sonnguyen.ws/install-apache-kafka-in-ubuntu-14-04/) and Python libraries (kafka-python, avro, io). And I am using Python 2.7
Create producer.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
from kafka import SimpleProducer, KafkaClient import avro.schema import io, random from avro.io import DatumWriter # To send messages synchronously kafka = KafkaClient('localhost:9092') producer = SimpleProducer(kafka) # Kafka topic topic = "my-topic" # Path to user.avsc avro schema schema_path="user.avsc" schema = avro.schema.parse(open(schema_path).read()) for i in xrange(10): writer = avro.io.DatumWriter(schema) bytes_writer = io.BytesIO() encoder = avro.io.BinaryEncoder(bytes_writer) writer.write({"name": "123", "favorite_color": "111", "favorite_number": random.randint(0,10)}, encoder) raw_bytes = bytes_writer.getvalue() producer.send_messages(topic, raw_bytes) |
Create consumer.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
from kafka import KafkaConsumer import avro.schema import avro.io import io # To consume messages consumer = KafkaConsumer('my-topic', group_id='my_group', bootstrap_servers=['localhost:9092']) schema_path="user.avsc" schema = avro.schema.parse(open(schema_path).read()) for msg in consumer: bytes_reader = io.BytesIO(msg.value) decoder = avro.io.BinaryDecoder(bytes_reader) reader = avro.io.DatumReader(schema) user1 = reader.read(decoder) print user1 |
Time for test:
1 2 3 4 |
# run consumer python consumer.py # run producer python producer.py |
I hope that this post will help you say “Hello” to Kafka, Python and Avro
Please see the details in GitHub: https://github.com/thanhson1085/python-kafka-avro
In the source code repository above, I also created consumer_bottledwater-pg.py to decode avro data that pushed from bottedwater-pg Kafka producer. This base on the question in Stackoverflow