问题背景

遇到这个问题,是之前调试日志,创建了一个topic和consumer groups,后面不用了执行删除该topic和consumer groups,结果删除操作命令没有报错,查看topic结果还在,这把我搞郁闷了,这啥情况?

我的使用环境如下:

主机名IP端口
kafka01.koevn.com10.100.10.19092
kafka02.koevn.com10.100.10.29092
kafka03.koevn.com10.100.10.39092
kafkaclient0110.100.8.11随机

1.查看kafka topic

Terminal window
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka01.koevn.com:9092,\
kafka02.koevn.com:9092,kafka03.koevn.com:9092 --list \
--command-config /opt/kafka/config/kraft/client.properties
---------------- output ------------------
__consumer_offsets
test_koevn

--command-config: 指定加密证书路径添加的参数

此时显示的topic有个test_koevn,这个就是要删除的topic,执行以下命令执行删除

2.删除指定topic

Terminal window
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka01.koevn.com:9092,\
kafka02.koevn.com:9092,kafka03.koevn.com:9092 --delete --topic test_koevn \
--command-config /opt/kafka/config/kraft/client.properties

⚠️ 注意 注意broker我是指定了三个,由于我使用Kafka KRaft三节点集群模式,且多副本和多分区配置,如果你只添加一个broker执行删除topic操作,你也只是删除这个节点的topic,其他节点副本仍会把数据同步过来,所以要完成删除topic,需要指定全部kafka集群节点,都执行删除本节点topic

此时执行完命令后,终端无报错,但再次查看topic,发现test_koevn还在,也就表示删除失败!

而且在删除的时候,查看kafka server.properties配置文件定义的log.dirs路径,会发现topic目录显示这种情况。

Terminal window
test_koevn-0
test_koevn-0-delete
test_koevn-1
test_koevn-1-delete
test_koevn-2
test_koevn-2-delete
test_koevn-3
test_koevn-3-delete
test_koevn-4
test_koevn-4-delete

test_koevn-0-delete这种目录过一会儿会自动删除,但是你执行删除topic操作,又是这种情况。

3.定位问题

查看kafka logs/server.log日志,有这么一段信息

Terminal window
2025-03-12 01:46:31 -0400 [ReplicaFetcherThread-0-1] WARN kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_OR_PARTITION from the leader for partition test_koevn-2. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist.

这是执行删除topic后,副本节点显示的告警信息,查了很多资料,很多都说配置没有添加delete.topic.enable=true并重启导致的,我尝试了问题并没有解决,我也检查了之前测试启动的生产者和消费者进程,都停止,最后查看所有消费组信息,看看有没有其他终端在连。

Terminal window
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server \
kafka01.koevn.com:9092,kafka02.koevn.com:9092,kafka03.koevn.com:9092 \
--all-groups --describe --command-config /opt/kafka/config/kraft/client.properties | grep test_koevn
---------------- output ------------------
test_koevn test_koevn 2 0 0 0 kafkaclient01-3-af11eb2f-f44b-431c-812a-652535730358 /10.100.8.11 kafkaclient01-3
test_koevn test_koevn 0 0 0 0 kafkaclient01-3-01ec3757-a139-42df-bc29-5b62aeae86d3 /10.100.8.11 kafkaclient01-3
test_koevn test_koevn 1 0 0 0 kafkaclient01-2-4ff5161a-0363-4623-9fb3-8e916d8d0fa8 /10.100.8.11 kafkaclient01-2
test_koevn test_koevn 3 0 0 0 kafkaclient01-2-fa058ab1-eafd-46df-8892-5d1d85fa23e4 /10.100.8.11 kafkaclient01-2

居然有另外一个服务一直在连,找到该IP地址,登录查看居然是logstash

4.解决问题

这是logstash其中一段配置,把配置文件里的topicsgroup_id项改掉,重载logstash配置

Terminal window
input {
kafka {
bootstrap_servers => "kafka01.koevn.com:9092,kafka02.koevn.com:9092,kafka03.koevn.com:9092"
client_id => "kafkaclient01"
topics => ["test_koevn"]
group_id => "test_koevn"
auto_offset_reset => "latest"
partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"
security_protocol => "SSL"
ssl_truststore_location => "/opt/logstash/config/certs/kafka_trustchain.jks"
ssl_truststore_password => "${kafka_truststore_password}"
ssl_keystore_location => "/opt/logstash/config/certs/kafka_client.jks"
ssl_keystore_password => "${kafka_keystore_password}"
consumer_threads => 4
decorate_events => true
}
}

再次查看所有消费组信息

Terminal window
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server \
kafka01.koevn.com:9092,kafka02.koevn.com:9092,kafka03.koevn.com:9092 \
--all-groups --describe --command-config /opt/kafka/config/kraft/client.properties | grep test_koevn
---------------- output ------------------
Consumer group 'test_koevn' has no active members.
test_koevn test_koevn 1 0 0 0 - - -
test_koevn test_koevn 3 0 0 0 - - -
test_koevn test_koevn 0 0 0 0 - - -
test_koevn test_koevn 2 0 0 0 - - -

已经看到没有客户端在连接了,然后执行上面第2步删除操作,topic test_koevn就正常删除了。

5.结语

出现这种问题还是做事不够严谨,特别是多集群节点架构,配置文件内容一不留神就修改错误,以为在做测试相关步骤,但改到非测试环境了,好在部署的logstash是多节点模式,具有较好容错率,不然消费错误的kafka topic,日志面板就无数据了!