cloudSeries · 1/52022년 8월 14일8 min read
Apache Kafka

Kafka CLI Commands Cheat Sheet

A collection of frequently used Kafka CLI commands.

FFrank Advenoh
#kafka#script#cli

When using Kafka, I mostly rely on the Ahkq UI, but for trouble-shooting or writing scripts, I often use the Kafka CLI as well. This post collects the Kafka CLI commands I use most often.

For how to run Kafka in a local environment, please refer to the previous post.

1.Download Kafka

Download the latest Kafka binary file from the link below.

$ cd src
$ wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
$ tar -jxvf kafka_2.13-3.2.1.tgz

2.Kafka CLI

Kafka's default port number starts at 9092, but here we run it on the port number configured in Running Kafka in a Local Environment.

In Kafka v2.2 and below, the Zookeeper URL and port number (e.g., localhost:2181) were used, but from Kafka v2.2+ the --bootstrap-server option is recommended. Starting from v3, the Zookeeper option is scheduled to be removed.

If you use the Kafka CLI frequently, it's recommended to add it to the PATH environment variable. That way you don't have to navigate to the Kafka binary folder every time to enter a command.

$ vim ~/.zshrc
...omitted...

# Kafka cli
export PATH="/Users/user/src/kafka_2.13-3.2.0/bin:$PATH"

$ source ~/.zshrc

2.1 Topics

2.1.2 List Topics

$ kafka-topics.sh --bootstrap-server localhost:29092 --list
__connect-config
__connect-offsets
__connect-status
__consumer_offsets
_schemas
frank
test

2.1.1 Create a Topic

$ kafka-topics.sh --bootstrap-server localhost:29092 --replication-factor 1 --partitions 1 --topic my_topic --create
Created topic my_topic.

2.1.3 View Topic Information

$ kafka-topics.sh --bootstrap-server localhost:29092 --topic my_topic --describe
Topic: my_topic	TopicId: Zlpf9YfsSRO07grMU3MZlA	PartitionCount: 1	ReplicationFactor: 1	Configs: compression.type=gzip
	Topic: my_topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

2.1.4 Delete a Topic

$ kafka-topics.sh --bootstrap-server localhost:29092 --topic my_topic --delete

2.2 Producer

$ kafka-console-producer.sh --bootstrap-server localhost:29092 --topic my_topic

Once the > prompt below appears, you can send data to the topic. (Pressing the Enter key sends it.)

> 

To exit the producer console window, enter Ctrl+C to terminate it.

2.2.1 How do you produce a message together with a key in kafka-console-producer.sh?

By default, when you send a message to a Kafka topic, a message with a null key is produced. To send a message together with a key, you need to use the parse.key and key.separator property values. In this example, : is used as the separator.

$ kafka-console-producer.sh --bootstrap-server localhost:29092 --topic my_topic --property parse.key=true --property key.separator=:
> key:value

2.3 Consumer

Here are things you should know when using the kafka-console-consumer.sh command.

  • Unless you specify the --from--begining option, only the most recent messages are printed
  • If the topic has not been created, it is automatically created by default
  • By specifying multiple topics separated by commas, you can consume from several topics at once
  • If you don't specify a consumer group, kafka-console-consumer creates a random consumer group
  • The order of messages may not be guaranteed
    • Message order is guaranteed only at the partition level, not at the topic level

The kafka-console-consumer.sh command options are as follows.

  • --from-beginning
    • Lets you receive messages from the beginning
  • --group
    • If you don't specify a consumer group, a random consumer group ID is automatically generated by default
  • --partition
    • Use this option to consume from a specific partition only
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic

2.3.2 How do you print the key and value?

$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --from-beginning
CreateTime:1660481815124	null	hello world
CreateTime:1660481829859	null	asdf
CreateTime:1660481833837	null	hello world
CreateTime:1660481919699	null	sdfj sdf
CreateTime:1660481923366	null	hello
CreateTime:1660481924547	null	asdf

2.4 Consumer Group

To explore the consumer group feature, create a topic with at least 2 partitions. Things to know when using consumer groups are as follows.

  • A group cannot have more consumers than the number of partitions in a Kafka topic. (# of consumer <= # of partition)
  • If you use the --group option to consume data from a consumer group and later try to use the --from-begining option again with the same consumer group, you'll see that it is ignored. In this case, you need to reset the consumer group
  • If you don't specify the --group option, a random consumer group such as console-consumer-11984 is created
  • If a single consumer receives all messages, the topic was probably created with only 1 partition.

Create a topic with 3 partitions.

$ kafka-topics.sh --bootstrap-server localhost:29092 --replication-factor 1 --partitions 3 --topic my_topic --create

Open 2 terminal windows and start each consumer in the consumer group using the --group option.

# console 1
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic --group my-first-application 

# console 2
$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic --group my-first-application 

If you send messages to the topic, you'll see that they are consumed alternately.

$ kafka-console-producer.sh --bootstrap-server localhost:29092 --topic my_topic
> 11
> 22

2.5. Consumer Group Management

This section covers how to reset a Kafka consumer group.

  • You cannot reset a consumer group while there are active consumers
  • Resetting is used to reprocess the data of a consumer group (e.g., when there is a bug fix)
  • It is configured with the --reset-offsets option
  • Offset reset strategy options
    • --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current

The kafka-consumer-groups.sh command options are as follows.

  • --dry-run
    • Only shows the result of execution and does not actually perform the command
  • --all-groups
    • Use with caution since it can apply the reset offset to all groups
  • --all-topics
    • Use with caution since it can apply the reset offset to all topics
  • --by-duration
    • Resets the offset by duration

2.5.1 Reset the offset with the to-earliest option

First, check that there are no active consumers.

$ kafka-consumer-groups.sh --bootstrap-server localhost:29092 --describe --group my-first-application
Consumer group 'my-first-application' has no active members.
GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
my-first-application my_topic        0          18              18              0               sarama-8e27bd86-2a35-4c2b-b127-24b69923d171 /172.19.0.1     sarama
my-first-application my_topic        1          8               8               0               sarama-8e27bd86-2a35-4c2b-b127-24b69923d171 /172.19.0.1     sarama
my-first-application my_topic        2          9               9               0               sarama-8e27bd86-2a35-4c2b-b127-24b69923d171 /172.19.0.1     sarama

To re-read the entire topic, change the offset to the earliest position.

$ kafka-consumer-groups.sh --bootstrap-server localhost:29092 --group my-first-application --reset-offsets --to-earliest --execute --topic my_topic

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
my-first-application           my_topic                       0          0
my-first-application           my_topic                       1          0
my-first-application           my_topic                       2          0

The new offset has been reset to 0 for all partitions. When you restart the consumer, it reads from the starting offset of each partition.

$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic --group my-first-application
hello world
asdf
...omitted...
value

2.5.2 Reset the offset with the --shift-by option

There is also a way to shift the offset by 2.

$ kafka-consumer-groups.sh --bootstrap-server localhost:29092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic my_topic

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
my-first-application           my_topic                       0          16
my-first-application           my_topic                       1          6
my-first-application           my_topic                       2          7

When you restart the consumer, you'll see that it returns only the last 2 messages from each partition of the topic.

$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my_topic --group my-first-application
200
22
180
210
44
value

3.FAQ

3.1 How to increase the number of partitions in a topic

Check the current number of partitions.

$ kafka-topics.sh --bootstrap-server localhost:29092 --topic my_topic --describe
Topic: my_topic	TopicId: ufrRaY-tTyqcHjFAY-q0ew	PartitionCount: 1	ReplicationFactor: 1	Configs: compression.type=gzip
	Topic: my_topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

Increase the number of partitions of the my_topic topic to 3.

$ kafka-topics.sh --bootstrap-server localhost:29092 --alter --topic my_topic --partitions 3

Verify that it was applied correctly.

$ kafka-topics.sh --bootstrap-server localhost:29092 --topic my_topic --describe
Topic: my_topic	TopicId: ufrRaY-tTyqcHjFAY-q0ew	PartitionCount: 3	ReplicationFactor: 1	Configs: compression.type=gzip
	Topic: my_topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: my_topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: my_topic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

3.2 How to delete a Consumer Group in Kafka

You can delete a consumer group to read the data completely from the beginning.

$ kafka-consumer-groups.sh --bootstrap-server localhost:29092 --delete --group my-first-application
Deletion of requested consumer groups ('my-first-application') was successful.

3.3 How to list all consumers in a Consumer Group

By listing all consumers in a consumer group, you can easily see where each consumer is located on the network and how much of the topic it is consuming.

$ kafka-consumer-groups.sh --bootstrap-server localhost:29092 --describe --group my-first-application
GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
my-first-application my_topic        0          18              18              0               sarama-473590a9-11eb-40c2-afa7-70c5ec448edf /172.18.0.1     sarama
my-first-application my_topic        1          8               8               0               sarama-473590a9-11eb-40c2-afa7-70c5ec448edf /172.18.0.1     sarama
my-first-application my_topic        2          9               9               0               sarama-473590a9-11eb-40c2-afa7-70c5ec448edf /172.18.0.1     saram

3.4 How to read messages from a specific offset and partition

https://developer.confluent.io/tutorials/kafka-console-consumer-read-specific-offsets-partitions/confluent.html

$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic report --partition 5 --offset 373601

3.5 How to also print the message Timestamp

$ kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic report --property print.timestamp=true

Reference

4. References

관련 글