Kafka: команды и настройки


В заметке собраны команды и настройки для Kafka, которые я часто использую.

Утилиты

kcat

Если вы часто работаете с Kafka, то вы скорее всего знакомы с официальным cli клиентом:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test.test --from-beginning
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Существует более лаконичный cli клиент kcat:

kcat -C -b localhost -t test.test
kcat -L -b localhost

kcat работает в разы быстрее официального клиента

Основные команды

Управление топиками:

./bin/kafka-topics.sh --bootstrap-server 10.10.100.1:6667 --create --topic test.test --partitions 2 --replication-factor 1

./bin/kafka-topics.sh --bootstrap-server 10.10.100.1:6667 --delete --topic test.test

Список топиков:

./bin/kafka-topics.sh --bootstrap-server=10.10.100.1:6667 --list
kcat -L -b 10.10.100.1:6667

Описание топика:

./bin/kafka-topics.sh --bootstrap-server=10.10.100.1:6667 --describe --topic test.test

Запись в топик

./bin/kafka-console-producer.sh --bootstrap-server 10.10.100.1:6667 --topic test.test
kcat -P -b localhost -t test.test

Чтение из топика

./bin/kafka-console-consumer.sh --bootstrap-server 10.10.100.1:6667 --topic test.test
kcat -C -b localhost -t test.test

Изменение конфигов топика

./bin/kafka-configs.sh --bootstrap-server 10.10.100.1:6667 --alter --entity-type topics --entity-name test.test --add-config retention.ms=-1
./bin/kafka-configs.sh --bootstrap-server 10.10.100.1:6667 --alter --entity-type topics --entity-name test.test --add-config retention.bytes=-1
./bin/kafka-configs.sh --bootstrap-server 10.10.100.1:6667 --alter --entity-type topics --entity-name test.test --add-config retention.ms=-1
./bin/kafka-configs.sh --bootstrap-server 10.10.100.1:6667 --alter --entity-type topics --entity-name test.test --add-config cleanup.policy=compact
./bin/kafka-configs.sh --bootstrap-server 10.10.100.1:6667 --alter --entity-type topics --entity-name test.test --delete-config retention.bytes --delete-config retention.ms

Аутентификация

Конфиг для java

Properties props = new Properties();
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");

Конфиг в виде property

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";

Пример запроса с аутентификацией

Получение сообщений (consumer)

kcat -C -b 10.10.100.1:6667 -X sasl.username=user -X sasl.password=password -t test.test -X security.protocol=sasl_plaintext -X sasl.mechanism=SCRAM-SHA-256

Отправка сообщений (producer)

kcat -P -b 10.10.100.1:6667 -X sasl.username=user -X sasl.password=password -t test.test -X security.protocol=sasl_plaintext -X sasl.mechanism=SCRAM-SHA-256