When using Kafka, I mostly rely on the Ahkq UI, but for trouble-shooting or writing scripts, I often use the Kafka CLI as well. This post collects the Kafka CLI commands I use most often.
For how to run Kafka in a local environment, please refer to the previous post.
1.Download Kafka
Download the latest Kafka binary file from the link below.
$ cd src
$ wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
$ tar -jxvf kafka_2.13-3.2.1.tgz
2.Kafka CLI
Kafka's default port number starts at 9092, but here we run it on the port number configured in Running Kafka in a Local Environment.
In Kafka v2.2 and below, the Zookeeper URL and port number (e.g.,
localhost:2181) were used, but from Kafka v2.2+ the--bootstrap-serveroption is recommended. Starting from v3, the Zookeeper option is scheduled to be removed.
If you use the Kafka CLI frequently, it's recommended to add it to the PATH environment variable. That way you don't have to navigate to the Kafka binary folder every time to enter a command.
$ vim ~/.zshrc
...omitted...
# Kafka cli
export PATH="/Users/user/src/kafka_2.13-3.2.0/bin:$PATH"
$ source ~/.zshrc
2.1 Topics
2.1.2 List Topics
$ kafka-topics.sh --bootstrap-server localhost:29092 --list
__connect-config
__connect-offsets
__connect-status
__consumer_offsets
_schemas
frank
test
2.1.1 Create a Topic
$ kafka-topics.sh --bootstrap-server localhost:29092 --replication-factor 1 --partitions 1 --topic my_topic --create
Created topic my_topic.
2.1.3 View Topic Information
$ kafka-topics.sh --bootstrap-server localhost:29092 --topic my_topic --describe
Topic: my_topic TopicId: Zlpf9YfsSRO07grMU3MZlA PartitionCount: 1 ReplicationFactor: 1 Configs: compression.type=gzip
Topic: my_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
2.1.4 Delete a Topic
$ kafka-topics.sh --bootstrap-server localhost:29092 --topic my_topic --delete
2.2 Producer
$ kafka-console-producer.sh --bootstrap-server localhost:29092 --topic my_topic
Once the > prompt below appears, you can send data to the topic. (Pressing the Enter key sends it.)
>
To exit the producer console window, enter Ctrl+C to terminate it.
2.2.1 How do you produce a message together with a key in kafka-console-producer.sh?
By default, when you send a message to a Kafka topic, a message with a null key is produced. To send a message together with a key, you need to use the parse.key and key.separator property values. In this example, : is used as the separator.
$ kafka-console-producer.sh --bootstrap-server localhost:29092 --topic my_topic --property parse.key=true --property key.separator=:
> key:value
2.3 Consumer
Here are things you should know when using the kafka-console-consumer.sh command.
- Unless you specify the
--from--beginingoption, only the most recent messages are printed - If the topic has not been created, it is automatically created by default
- By specifying multiple topics separated by commas, you can consume from several topics at once
- If you don't specify a consumer group,
kafka-console-consumercreates a random consumer group - The order of messages may not be guaranteed
- Message order is guaranteed only at the partition level, not at the topic level
The kafka-console-consumer.sh command options are as follows.
--from-beginning- Lets you receive messages from the beginning
--group- If you don't specify a consumer group, a random consumer group ID is automatically generated by default
--partition- Use this option to consume from a specific partition only
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic
2.3.2 How do you print the key and value?
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --from-beginning
CreateTime:1660481815124 null hello world
CreateTime:1660481829859 null asdf
CreateTime:1660481833837 null hello world
CreateTime:1660481919699 null sdfj sdf
CreateTime:1660481923366 null hello
CreateTime:1660481924547 null asdf
2.4 Consumer Group
To explore the consumer group feature, create a topic with at least 2 partitions. Things to know when using consumer groups are as follows.
- A group cannot have more consumers than the number of partitions in a Kafka topic. (# of consumer <= # of partition)
- If you use the
--groupoption to consume data from a consumer group and later try to use the--from-beginingoption again with the same consumer group, you'll see that it is ignored. In this case, you need to reset the consumer group - If you don't specify the
--groupoption, a random consumer group such asconsole-consumer-11984is created - If a single consumer receives all messages, the topic was probably created with only 1 partition.
Create a topic with 3 partitions.
$ kafka-topics.sh --bootstrap-server localhost:29092 --replication-factor 1 --partitions 3 --topic my_topic --create
Open 2 terminal windows and start each consumer in the consumer group using the --group option.
# console 1
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic --group my-first-application
# console 2
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic --group my-first-application
If you send messages to the topic, you'll see that they are consumed alternately.
$ kafka-console-producer.sh --bootstrap-server localhost:29092 --topic my_topic
> 11
> 22
2.5. Consumer Group Management
This section covers how to reset a Kafka consumer group.
- You cannot reset a consumer group while there are active consumers
- Resetting is used to reprocess the data of a consumer group (e.g., when there is a bug fix)
- It is configured with the
--reset-offsetsoption - Offset reset strategy options
--to-datetime,--by-period,--to-earliest,--to-latest,--shift-by,--from-file,--to-current
The kafka-consumer-groups.sh command options are as follows.
--dry-run- Only shows the result of execution and does not actually perform the command
--all-groups- Use with caution since it can apply the reset offset to all groups
--all-topics- Use with caution since it can apply the reset offset to all topics
--by-duration- Resets the offset by duration
2.5.1 Reset the offset with the to-earliest option
First, check that there are no active consumers.
$ kafka-consumer-groups.sh --bootstrap-server localhost:29092 --describe --group my-first-application
Consumer group 'my-first-application' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-first-application my_topic 0 18 18 0 sarama-8e27bd86-2a35-4c2b-b127-24b69923d171 /172.19.0.1 sarama
my-first-application my_topic 1 8 8 0 sarama-8e27bd86-2a35-4c2b-b127-24b69923d171 /172.19.0.1 sarama
my-first-application my_topic 2 9 9 0 sarama-8e27bd86-2a35-4c2b-b127-24b69923d171 /172.19.0.1 sarama
To re-read the entire topic, change the offset to the earliest position.
$ kafka-consumer-groups.sh --bootstrap-server localhost:29092 --group my-first-application --reset-offsets --to-earliest --execute --topic my_topic
GROUP TOPIC PARTITION NEW-OFFSET
my-first-application my_topic 0 0
my-first-application my_topic 1 0
my-first-application my_topic 2 0
The new offset has been reset to 0 for all partitions. When you restart the consumer, it reads from the starting offset of each partition.
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic --group my-first-application
hello world
asdf
...omitted...
value
2.5.2 Reset the offset with the --shift-by option
There is also a way to shift the offset by 2.
$ kafka-consumer-groups.sh --bootstrap-server localhost:29092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic my_topic
GROUP TOPIC PARTITION NEW-OFFSET
my-first-application my_topic 0 16
my-first-application my_topic 1 6
my-first-application my_topic 2 7
When you restart the consumer, you'll see that it returns only the last 2 messages from each partition of the topic.
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic --group my-first-application
200
22
180
210
44
value
3.FAQ
3.1 How to increase the number of partitions in a topic
Check the current number of partitions.
$ kafka-topics.sh --bootstrap-server localhost:29092 --topic my_topic --describe
Topic: my_topic TopicId: ufrRaY-tTyqcHjFAY-q0ew PartitionCount: 1 ReplicationFactor: 1 Configs: compression.type=gzip
Topic: my_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Increase the number of partitions of the my_topic topic to 3.
$ kafka-topics.sh --bootstrap-server localhost:29092 --alter --topic my_topic --partitions 3
Verify that it was applied correctly.
$ kafka-topics.sh --bootstrap-server localhost:29092 --topic my_topic --describe
Topic: my_topic TopicId: ufrRaY-tTyqcHjFAY-q0ew PartitionCount: 3 ReplicationFactor: 1 Configs: compression.type=gzip
Topic: my_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: my_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: my_topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
3.2 How to delete a Consumer Group in Kafka
You can delete a consumer group to read the data completely from the beginning.
$ kafka-consumer-groups.sh --bootstrap-server localhost:29092 --delete --group my-first-application
Deletion of requested consumer groups ('my-first-application') was successful.
3.3 How to list all consumers in a Consumer Group
By listing all consumers in a consumer group, you can easily see where each consumer is located on the network and how much of the topic it is consuming.
$ kafka-consumer-groups.sh --bootstrap-server localhost:29092 --describe --group my-first-application
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-first-application my_topic 0 18 18 0 sarama-473590a9-11eb-40c2-afa7-70c5ec448edf /172.18.0.1 sarama
my-first-application my_topic 1 8 8 0 sarama-473590a9-11eb-40c2-afa7-70c5ec448edf /172.18.0.1 sarama
my-first-application my_topic 2 9 9 0 sarama-473590a9-11eb-40c2-afa7-70c5ec448edf /172.18.0.1 saram
3.4 How to read messages from a specific offset and partition
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic report --partition 5 --offset 373601
3.5 How to also print the message Timestamp
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic report --property print.timestamp=true
Reference
4. References
- https://www.conduktor.io/kafka/kafka-cli-tutorial
- https://kafka.apache.org/documentation/#basic_ops
- https://betterprogramming.pub/kafka-cli-commands-1a135a4ae1bd
- https://medium.com/@TimvanBaarsen/apache-kafka-cli-commands-cheat-sheet-a6f06eac01b
- https://akageun.github.io/2020/05/07/kafka-cli.html
- https://hevodata.com/learn/kafka-cli-commands/
- https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/kafka-commands.html