背景

以前にログをデバッグし、トピックとコンシューマー グループを作成したため、この問題に遭遇しました。その後、トピックとコンシューマー グループは不要になったため削除しました。しかし、削除コマンドではエラーは報告されませんでしたが、トピックを確認するとまだ残っていました。これによって私は落ち込んでしまいました。どうしたの?

私の使用環境は以下の通りです:

NameIPPort
kafka01.koevn.com10.100.10.19092
kafka02.koevn.com10.100.10.29092
kafka03.koevn.com10.100.10.39092
kafkaclient0110.100.8.11随机

Kafkaトピックを表示

Terminal window
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka01.koevn.com:9092,\
kafka02.koevn.com:9092,kafka03.koevn.com:9092 --list \
--command-config /opt/kafka/config/kraft/client.properties
---------------- output ------------------
__consumer_offsets
test_koevn

--command-config: パラメータを追加するための暗号化証明書のパスを指定します

今回表示されるトピックには test_koevn があり、これが削除対象のトピックになります。削除するには次のコマンドを実行します

指定されたトピックを削除する

Terminal window
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka01.koevn.com:9092,\
kafka02.koevn.com:9092,kafka03.koevn.com:9092 --delete --topic test_koevn \
--command-config /opt/kafka/config/kraft/client.properties

⚠️ 注意 3 つのブローカーを指定したことに注意してください。複数のレプリカとパーティションを持つ 3 ノードの Kafka KRaft クラスター モードを使用しているため、トピックを削除するために ‘broker’ を 1 つだけ追加すると、このノードのトピックのみが削除されます。他のノードのレプリカは引き続きデータを同期します。したがって、トピックの削除を完了するには、このノードのトピックを削除するようにすべての Kafka クラスター ノードを指定する必要があります。

コマンド実行後、ターミナルにエラーは表示されませんが、トピックを再度確認すると、test_koevn がまだ存在しており、削除に失敗したことがわかります!

削除するときに、kafka の server.properties 構成ファイルで定義されている log.dirs パスを確認すると、トピック ディレクトリにこの状況が表示されていることがわかります。

Terminal window
test_koevn-0
test_koevn-0-delete
test_koevn-1
test_koevn-1-delete
test_koevn-2
test_koevn-2-delete
test_koevn-3
test_koevn-3-delete
test_koevn-4
test_koevn-4-delete

test_koevn-0-deleteこのディレクトリはしばらくすると自動的に削除されますが、トピック削除操作を実行すると、この状況が再度発生します。

位置決めの問題

kafkaのログ/server.logログを確認すると、次のようなメッセージがあります

Terminal window
2025-03-12 01:46:31 -0400 [ReplicaFetcherThread-0-1] WARN kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_OR_PARTITION from the leader for partition test_koevn-2. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist.

これは、トピックを削除した後にレプリカ ノードによって表示される警告メッセージです。多くの情報を調べたところ、設定に delete.topic.enable=true を追加せずに再起動したことが原因という情報が多くありました。試してみましたが、問題は解決しませんでした。また、前回のテストで開始されたプロデューサープロセスとコンシューマープロセスを確認し、停止しました。最後に、すべてのコンシューマー グループ情報をチェックして、他の端末が接続されているかどうかを確認しました。

Terminal window
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server \
kafka01.koevn.com:9092,kafka02.koevn.com:9092,kafka03.koevn.com:9092 \
--all-groups --describe --command-config /opt/kafka/config/kraft/client.properties | grep test_koevn
---------------- output ------------------
test_koevn test_koevn 2 0 0 0 kafkaclient01-3-af11eb2f-f44b-431c-812a-652535730358 /10.100.8.11 kafkaclient01-3
test_koevn test_koevn 0 0 0 0 kafkaclient01-3-01ec3757-a139-42df-bc29-5b62aeae86d3 /10.100.8.11 kafkaclient01-3
test_koevn test_koevn 1 0 0 0 kafkaclient01-2-4ff5161a-0363-4623-9fb3-8e916d8d0fa8 /10.100.8.11 kafkaclient01-2
test_koevn test_koevn 3 0 0 0 kafkaclient01-2-fa058ab1-eafd-46df-8892-5d1d85fa23e4 /10.100.8.11 kafkaclient01-2

常時接続されている別のサービスがあります。 IPアドレスを見つけてログインしてみると、logstashであることがわかりました。

問題を解決する

これはlogstash設定のセクションです。設定ファイル内の topics および group_id 項目を変更し、logstash 設定を再ロードします。

Terminal window
input {
kafka {
bootstrap_servers => "kafka01.koevn.com:9092,kafka02.koevn.com:9092,kafka03.koevn.com:9092"
client_id => "kafkaclient01"
topics => ["test_koevn"]
group_id => "test_koevn"
auto_offset_reset => "latest"
partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"
security_protocol => "SSL"
ssl_truststore_location => "/opt/logstash/config/certs/kafka_trustchain.jks"
ssl_truststore_password => "${kafka_truststore_password}"
ssl_keystore_location => "/opt/logstash/config/certs/kafka_client.jks"
ssl_keystore_password => "${kafka_keystore_password}"
consumer_threads => 4
decorate_events => true
}
}

すべての消費者グループ情報を再度表示する

Terminal window
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server \
kafka01.koevn.com:9092,kafka02.koevn.com:9092,kafka03.koevn.com:9092 \
--all-groups --describe --command-config /opt/kafka/config/kraft/client.properties | grep test_koevn
---------------- output ------------------
Consumer group 'test_koevn' has no active members.
test_koevn test_koevn 1 0 0 0 - - -
test_koevn test_koevn 3 0 0 0 - - -
test_koevn test_koevn 0 0 0 0 - - -
test_koevn test_koevn 2 0 0 0 - - -

接続されているクライアントがいないことがわかります。次に、上記の手順 2 の削除操作を実行すると、トピック test_koevn が正常に削除されます。

結論

この種の問題は、特にマルチクラスター ノード アーキテクチャにおいて、十分に厳密でないために発生します。設定ファイルの内容が、注意を払わずに誤って変更されます。テスト関連の手順を実行していると思っていましたが、テスト以外の環境に変更しました。幸いなことに、デプロイされた logstash は、優れたフォールト トレランスを備えたマルチノード モードになっています。そうしないと、間違った Kafka トピックを使用すると、ログ パネルに無数のデータが書き込まれてしまいます。