Alexandre Dupriez created KAFKA-14190: -----------------------------------------
Summary: Corruption of Topic IDs with pre-2.8.0 admin clients Key: KAFKA-14190 URL: https://issues.apache.org/jira/browse/KAFKA-14190 Project: Kafka Issue Type: Bug Components: admin, core, zkclient Affects Versions: 3.2.1, 3.1.1, 3.2.0, 3.0.1, 3.0.0, 2.8.1, 3.1.0 Reporter: Alexandre Dupriez h4. Scope The problem reported below has been verified to occur with Zookeeper controllers. It has not been attempted with Kraft controllers, although it is unlikely to be reproduced in Kraft mode given the nature of the issue and clients involved. h4. Problem Description There is a loss of topic IDs when an AdminClient of version < 2.8.0 is used to increase the number of partitions of a topic for a cluster with version >= 2.8.0. This results in the controller re-creating topic IDs upon restart, eventually conflicting with the topic ID of broker’s partition.metadata files in the partition directories of the impacted topic, leading to an availability loss of the partitions which do not accept leadership / follower-ship when the topic ID indicated by a LeaderAndIsr request differ from their own locally cached ID. One mitigation post-corruption is to substitute the stale topic ID in the partition.metadata files with the new topic ID referenced by the controller, or alternatively, delete the partition.metadata file altogether. h4. Steps to reproduce 1. Set-up and launch a two-nodes Kafka cluster in Zookeeper mode. 2. Create a topic e.g. via {{kafka-topics.sh}} {noformat} ./bin/kafka-topics.sh --bootstrap-server :9092 --create --topic myTopic --partitions 2 --replication-factor 2{noformat} 3. Capture the topic ID using a 2.8.0+ client. {noformat} ./kafka/bin/kafka-topics.sh --bootstrap-server :9092 --topic myTopic --describe Topic: myTopic TopicId: jKTRaM_TSNqocJeQI2aYOQ PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1 Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1{noformat} 4. Restart one of the broker. This will make each broker create the {{partition.metadata}} files in the partition directories since it will already have loaded the {{Log}} instance in memory. 5. Using a pre-2.8.0 client library, run the following command. {noformat} ./kafka/bin/kafka-topics.sh --zookeeper :2181 --alter --topic myTopic --partitions 3{noformat} 6. Using a 2.8.0+ client library, describe the topic via Zookeeper and notice the absence of topic ID from the output, where it is otherwise expected. {noformat} ./kafka/bin/kafka-topics.sh —zookeeper :2181 —describe —topic myTopic Topic: myTopic PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1 Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat} 7. Using a 2.8.0+ client library, describe the topic via a broker endpoint and notice the topic ID changed. {noformat} ./kafka/bin/kafka-topics.sh —bootstrap-server :9093 —describe —topic myTopic Topic: myTopic TopicId: nI-JQtPwQwGiylMfm8k13w PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: myTopic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0 Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat} 8. Restart the controller. 9. Check the state-change.log file on the controller broker. The following type of logs will appear. {noformat} [2022-08-25 17:44:05,308] ERROR [Broker id=0] Topic Id in memory: jKTRaM_TSNqocJeQI2aYOQ does not match the topic Id for partition myTopic-0 provided in the request: nI-JQtPwQwGiylMfm8k13w. (state.change.logger){noformat} 10. Restart the other broker. 11. Describe the topic via the broker endpoint or Zookeeper with a 2.8.0+ client library {noformat} ./kafka/bin/kafka-topics.sh --zookeeper :2181 --describe --topic myTopic Topic: myTopic TopicId: nI-JQtPwQwGiylMfm8k13w PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: myTopic Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0 Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0 Topic: myTopic Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0{noformat} Notice the abnormal state the topic is in: ISR is reduced to one single broker which is claimed to be the leader by the controller (here, broker 0). The controller believes 0 is the leader because it does not handle the error response from peer brokers when sending the requests for them to become a leader or follower of a partition. 12. Verify produce is unavailable. {noformat} ./kafka/bin/kafka-console-producer.sh —bootstrap-server:9092 —topic myTopic [2022-08-25 17:52:59,962] ERROR Error when sending message to topic myTopic with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. [2022-08-25 17:52:59,964] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition myTopic-1 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender){noformat} 13. Verify consume is unavailable. {noformat} ./kafka/bin/kafka-console-consumer.sh —bootstrap-server:9092 —topic myTopic [2022-08-25 17:53:49,416] DEBUG [Consumer clientId=consumer-console-consumer-25008-1, groupId=console-consumer-25008] Received LIST_OFFSETS response from node 0 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6, clientId=consumer-console-consumer-25008-1, correlationId=31): ListOffsetsResponseData(throttleTimeMs=0, topics=[ListOffsetsTopicResponse(name='myTopic', partitions=[ListOffsetsPartitionResponse(partitionIndex=1, errorCode=75, oldStyleOffsets=[], timestamp=-1, offset=-1, leaderEpoch=-1), ListOffsetsPartitionResponse(partitionIndex=2, errorCode=75, oldStyleOffsets=[], timestamp=-1, offset=-1, leaderEpoch=-1), ListOffsetsPartitionResponse(partitionIndex=0, errorCode=75, oldStyleOffsets=[], timestamp=-1, offset=-1, leaderEpoch=-1)])]) (org.apache.kafka.clients.NetworkClient) [2022-08-25 17:53:49,416] DEBUG [Consumer clientId=consumer-console-consumer-25008-1, groupId=console-consumer-25008] Attempt to fetch offsets for partition myTopic-1 failed due to UNKNOWN_LEADER_EPOCH, retrying. (org.apache.kafka.clients.consumer.internals.Fetcher) {noformat} h4. Follow-up Currently, the ID of a topic is stored in the znode /brokers/topics/<topic-name> along with the partition assignment. This is a natural choice of location, however the overwrite of the znode performed by “old” admin client destroys information which is later on not recovered by the cluster. It would be tempting to think about keeping the topic ID information in one single place, however each of the following approaches fail: # Keeping the information in Zookeeper only. But without being stored with the partition’s data, locally on brokers, there would be no way for a broker to know that the data associated to the partition belongs to a topic which is different from that currently referenced by the controller, defeating the purpose of topic IDs. # Keeping the information locally on brokers, but ensuring consistency would then require an extreme level of complexity, if at all possible without the use of a strongly consistent data store. Therefore, topic IDs have to be maintained in both location. Given the fact that this information is immutable, this should not be a problem, except in the case encountered here. Additionally, note that any client which references the topic ID will be also impacted by a change in the topic ID upon the controller’s initiative when such an ID is found as absent. Since pre-2.8.0 clients from the Kafka client or third-party libraries are still widely used, it may be worth thinking about how to remediate the problem for as long as Zookeeper controllers are supported. One way to prevent destructing the information contained in the topic assignment znode could potentially be to store it in an out-of-band znode, although feasability and correctness of that approach needs to be assessed. Such an approach would likely add a significant level of complexity without enough added benefit. Preserving consistency (especially if atomic reads/writes to the two znodes are not possible) can prove to be difficult. Keeping full backward compatibility with existing pre and post 2.8.0 clients, and after version upgrades, would add to the obstacles. -- This message was sent by Atlassian Jira (v8.20.10#820010)