kafka

https://kafka.apache.org/

Prerequisites

Get shell script install_java_bin.

Download java binary packages:

$ curl -LO https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.9.0/kafka_2.13-3.9.0.tgz

Check sum:

$ sha512sum kafka_2.13-3.9.0.tgz
5324c1f44d4c84ea469712c2cc3d2d15545c3716edbb5353722df9c661fcc78b031fcf07d1c4f0309c5fdb32686665dfb0cffe55210cd3a1fe2a370538cb4e6d  kafka_2.13-3.9.0.tgz

Install and start zookeeper first.

Deploy

Install the java packages on each node:

$ install_java_bin kafka kafka_2.13-3.9.0.tgz /opt
$ sudo chown ubuntu:ubuntu /opt/kafka

Configure

Edit file /opt/kafka/config/server.properties:

 ############################# Log Basics #############################
 
 # A comma separated list of directories under which to store log files
-log.dirs=/tmp/kafka-logs
+log.dirs=/opt/tmp/kafka-logs
 
 # The default number of log partitions per topic. More partitions allow greater
 # parallelism for consumption, but this will also result in more files across
 # the brokers.
-num.partitions=1
+num.partitions=3
 
 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
 # This value is recommended to be increased for installations with data dirs located in RAID array.
 # from the end of the log.
 
 # The minimum age of a log file to be eligible for deletion due to age
-log.retention.hours=168
+log.retention.hours=24
 
 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
 # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
 # You can also append an optional chroot string to the urls to specify the
 # root directory for all kafka znodes.
-zookeeper.connect=localhost:2181
+zookeeper.connect=las0:2181,las1:2181,las2:2181
 
 # Timeout in ms for connecting to zookeeper
 zookeeper.connection.timeout.ms=18000
 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
 # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
 group.initial.rebalance.delay.ms=0
+
+delete.topic.enable=true

This file need to be copied to all nodes to the same path. On each node, the broker.id option in the config file must be reset to an unique number, for example, on host las2:

$ sudo sed -ie /broker.id=/c\\broker.id=2 /opt/kafka/config/server.properties

Create the directory of ${log.dirs} on each node:

$ sudo mkdir -p /opt/tmp/kafka-logs
$ sudo chown ubuntu:ubuntu /opt/tmp/kafka-logs

Run

Check the version:

$ kafka-configs.sh --version
3.9.0

Start the services on each node:

$ kafka-server-start.sh -daemon /opt/kafka/config/server.properties

Show java processes:

$ jps -lm
3332661 kafka.Kafka /opt/kafka/config/server.properties
3335576 sun.tools.jps.Jps -lm
3178940 org.apache.zookeeper.server.quorum.QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg

Stop the services on each node:

$ kafka-server-stop.sh

Usage

Create, list and show details of a topic:

$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic xxxx --partitions 3 --replication-factor 1
Created topic xxxx.
$ kafka-topics.sh --bootstrap-server localhost:9092 --list
xxxx
$ kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic xxxx
Topic: xxxx TopicId: 6LCwAbkHTlSz9usG_YDaaw PartitionCount: 3   ReplicationFactor: 1    Configs: 
    Topic: xxxx Partition: 0    Leader: 1   Replicas: 1 Isr: 1  Elr: N/A    LastKnownElr: N/A
    Topic: xxxx Partition: 1    Leader: 0   Replicas: 0 Isr: 0  Elr: N/A    LastKnownElr: N/A
    Topic: xxxx Partition: 2    Leader: 2   Replicas: 2 Isr: 2  Elr: N/A    LastKnownElr: N/A

Delete the topic:

$ kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic xxxx

Produce messages:

$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic xxxx

Consume messages:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic xxxx

List the consumer groups:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-79209