В заметке собраны команды и настройки для Kafka, которые я часто использую.
Утилиты
kcat
Если вы часто работаете с Kafka, то вы скорее всего знакомы с официальным cli клиентом:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test.test --from-beginning
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Существует более лаконичный cli клиент kcat:
kcat -C -b localhost -t test.test
kcat -L -b localhost
kcat работает в разы быстрее официального клиента
Основные команды
Управление топиками:
./bin/kafka-topics.sh --bootstrap-server 10.10.100.1:6667 --create --topic test.test --partitions 2 --replication-factor 1
./bin/kafka-topics.sh --bootstrap-server 10.10.100.1:6667 --delete --topic test.test
Список топиков:
./bin/kafka-topics.sh --bootstrap-server=10.10.100.1:6667 --list
kcat -L -b 10.10.100.1:6667
Описание топика:
./bin/kafka-topics.sh --bootstrap-server=10.10.100.1:6667 --describe --topic test.test
Запись в топик
./bin/kafka-console-producer.sh --bootstrap-server 10.10.100.1:6667 --topic test.test
kcat -P -b localhost -t test.test
Чтение из топика
./bin/kafka-console-consumer.sh --bootstrap-server 10.10.100.1:6667 --topic test.test
kcat -C -b localhost -t test.test
Изменение конфигов топика
./bin/kafka-configs.sh --bootstrap-server 10.10.100.1:6667 --alter --entity-type topics --entity-name test.test --add-config retention.ms=-1
./bin/kafka-configs.sh --bootstrap-server 10.10.100.1:6667 --alter --entity-type topics --entity-name test.test --add-config retention.bytes=-1
./bin/kafka-configs.sh --bootstrap-server 10.10.100.1:6667 --alter --entity-type topics --entity-name test.test --add-config retention.ms=-1
./bin/kafka-configs.sh --bootstrap-server 10.10.100.1:6667 --alter --entity-type topics --entity-name test.test --add-config cleanup.policy=compact
./bin/kafka-configs.sh --bootstrap-server 10.10.100.1:6667 --alter --entity-type topics --entity-name test.test --delete-config retention.bytes --delete-config retention.ms
Аутентификация
Конфиг для java
Properties props = new Properties();
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
Конфиг в виде property
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";
Пример запроса с аутентификацией
Получение сообщений (consumer)
kcat -C -b 10.10.100.1:6667 -X sasl.username=user -X sasl.password=password -t test.test -X security.protocol=sasl_plaintext -X sasl.mechanism=SCRAM-SHA-256
Отправка сообщений (producer)
kcat -P -b 10.10.100.1:6667 -X sasl.username=user -X sasl.password=password -t test.test -X security.protocol=sasl_plaintext -X sasl.mechanism=SCRAM-SHA-256