Kafka on Python

Environments

  • Redhat 6.6 (CDH5.3 installed)
    • python 2.7.11 / python3.5.1
  • Redhat 7.3
    • python 2.7.5 / python3.6.1
  • kafka 2.11-0.10.2.0; Download
    • Quickstart
    • execution

      $ /path/to/kafka-console-consumer.sh \
        --bootstrap-server <server>:<port> \
        --topic <topic> \
        [--from-beginning|--partition <partition> --offset <offset>]
      
      • to use offset, must use partition, too

ref

Library

  • confluent-kafka
  • librdkafka 기반
  • installation

    $ sudo yum install librdkafka-devel python-devel
    $ pip[3] install confluent-kafka
    
  • ref
  • kafka-docker
  • kafka-python
    • practice from Python Kafka Code Example
      • installation pip install kafka-python or pip3 install kafka-python
        1. start kafka server
        $ ./bin/kafka-server-start.sh config/server.properties &
        $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fast-messages
        $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
        fast-messages
        
        1. Consumer
        from kafka import KafkaConsumer
        consumer = KafkaConsumer('fast-messages', bootstrap_servers='localhost:9092')
        # consumer = KafkaConsumer('some topic', group_id='some group', bootstrap_servers=['some server:9092'])
        # consumer.subscribe('some topic')`
        for message in consumer:
          print(message)
        
        1. Producer
        from kafka import KafkaProducer
        producer = KafkaProducer(bootstrap_servers='localhost:9092')
        for i in range(1000):
          producer.send('fast-messages', key=str.encode('key_{}'.format(i)), value=b'some_message_bytes')
        
      • troubleshooting when kafka topic already exists happens, from How to Delete a topic in apache kafka

        $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
        fast-messages
        
        $ sh /usr/lib/zookeeper/bin/zkCli.sh -server localhost:2181
        ...
        [zk: localhost:2181(CONNECTED) 0] ls /brokers/topics
        [fast-messages]
        [zk: localhost:2181(CONNECTED) 1] rmr /brokers/topics/fast-messages
        [zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
        []
        
        $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
        
    • ref
  • pykafka
    • installation

      $ sudo yum install snappy-devel       # Redhat
      $ sudo apt-get install libsnappy-dev  # Ubuntu
      
      $ pip[3] install python-snappy
      $ pip[3] install pykafka
      
    • Consumer

      # python 3.6.1
      from pykafka import KafkaClient
      
      # for topic, bytearray doesn't work
      bootstrap_server, my_topic = '{}:{}'.format('x.y.z.w', '1234'), bytes('some_topic', 'utf8')
      
      client = KafkaClient(bootstrap_server)
      topic = client.topics[my_topic]
      consumer = topic.get_simple_consumer()
      for message in consumer:
        print(message.value)
      
    • failed to run on Docker; Everytime I run my test program, opened ports are various like below, so I don’t know which ports to pen when running Docker

      $ python3 test_pykafka.py
      $ netstat -anp | grep -i python3
      (Not all processes could be identified, non-owned process info
       will not be shown, you would have to be root to see it all.)
      tcp        0      0 10.61.26.109:55584      10.195.23.74:3306       ESTABLISHED 17084/python3
      tcp        0      0 10.61.26.109:42717      10.60.28.89:9092        ESTABLISHED 17084/python3
      tcp        0      0 10.61.26.109:39935      10.60.5.205:9092        ESTABLISHED 17084/python3
      tcp        0      0 10.61.26.109:38610      10.60.29.145:9092       ESTABLISHED 17084/python3
      tcp        0      0 10.61.26.109:53859      10.60.29.83:9092        ESTABLISHED 17084/python3
      tcp        0      0 10.61.26.109:55582      10.195.23.74:3306       ESTABLISHED 17084/python3
      tcp        0      0 10.61.26.109:43787      10.60.1.19:9092         ESTABLISHED 17084/python3
      
      # reexecute
      $ python3 test_pykafka.py
      $ netstat -anp | grep -i python3
      (Not all processes could be identified, non-owned process info
       will not be shown, you would have to be root to see it all.)
      tcp        0      0 10.61.26.109:43546      10.61.26.109:8202       ESTABLISHED 29088/python3
      tcp        0      0 10.61.26.109:59354      10.60.29.145:9092       ESTABLISHED 29088/python3
      tcp        0      0 10.61.26.109:40407      10.60.1.19:9092         ESTABLISHED 29088/python3
      tcp        0      0 10.61.26.109:40749      10.60.28.89:9092        ESTABLISHED 29088/python3
      tcp        0      0 10.61.26.109:53573      10.60.5.205:9092        ESTABLISHED 29088/python3
      tcp        0      0 10.61.26.109:48598      10.60.29.83:9092        ESTABLISHED 29088/python3
      tcp        0      0 10.61.26.109:60866      10.195.23.74:3306       ESTABLISHED 29088/python3
      tcp        0      0 10.61.26.109:60864      10.195.23.74:3306       ESTABLISHED 29088/python3
      
    • ref

Written on June 7, 2017