Alexandre Dupriez created KAFKA-14190:
-----------------------------------------
Summary: Corruption of Topic IDs with pre-2.8.0 admin clients
Key: KAFKA-14190
URL: https://issues.apache.org/jira/browse/KAFKA-14190
Project: Kafka
Issue Type: Bug
Components: admin, core, zkclient
Affects Versions: 3.2.1, 3.1.1, 3.2.0, 3.0.1, 3.0.0, 2.8.1, 3.1.0
Reporter: Alexandre Dupriez
h4. Scope
The problem reported below has been verified to occur with Zookeeper
controllers. It has not been attempted with Kraft controllers, although it is
unlikely to be reproduced in Kraft mode given the nature of the issue and
clients involved.
h4. Problem Description
There is a loss of topic IDs when an AdminClient of version < 2.8.0 is used to
increase the number of partitions of a topic for a cluster with version >=
2.8.0. This results in the controller re-creating topic IDs upon restart,
eventually conflicting with the topic ID of broker’s partition.metadata files
in the partition directories of the impacted topic, leading to an availability
loss of the partitions which do not accept leadership / follower-ship when the
topic ID indicated by a LeaderAndIsr request differ from their own locally
cached ID.
One mitigation post-corruption is to substitute the stale topic ID in the
partition.metadata files with the new topic ID referenced by the controller, or
alternatively, delete the partition.metadata file altogether.
h4. Steps to reproduce
1. Set-up and launch a two-nodes Kafka cluster in Zookeeper mode.
2. Create a topic e.g. via {{kafka-topics.sh}}
{noformat}
./bin/kafka-topics.sh --bootstrap-server :9092 --create --topic myTopic
--partitions 2 --replication-factor 2{noformat}
3. Capture the topic ID using a 2.8.0+ client.
{noformat}
./kafka/bin/kafka-topics.sh --bootstrap-server :9092 --topic myTopic --describe
Topic: myTopic TopicId: jKTRaM_TSNqocJeQI2aYOQ PartitionCount: 2
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: myTopic Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1{noformat}
4. Restart one of the broker. This will make each broker create the
{{partition.metadata}} files in the partition directories since it will already
have loaded the {{Log}} instance in memory.
5. Using a pre-2.8.0 client library, run the following command.
{noformat}
./kafka/bin/kafka-topics.sh --zookeeper :2181 --alter --topic myTopic
--partitions 3{noformat}
6. Using a 2.8.0+ client library, describe the topic via Zookeeper and notice
the absence of topic ID from the output, where it is otherwise expected.
{noformat}
./kafka/bin/kafka-topics.sh —zookeeper :2181 —describe —topic myTopic
Topic: myTopic PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat}
7. Using a 2.8.0+ client library, describe the topic via a broker endpoint and
notice the topic ID changed.
{noformat}
./kafka/bin/kafka-topics.sh —bootstrap-server :9093 —describe —topic myTopic
Topic: myTopic TopicId: nI-JQtPwQwGiylMfm8k13w PartitionCount: 3
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: myTopic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat}
8. Restart the controller.
9. Check the state-change.log file on the controller broker. The following type
of logs will appear.
{noformat}
[2022-08-25 17:44:05,308] ERROR [Broker id=0] Topic Id in memory:
jKTRaM_TSNqocJeQI2aYOQ does not match the topic Id for partition myTopic-0
provided in the request: nI-JQtPwQwGiylMfm8k13w. (state.change.logger){noformat}
10. Restart the other broker.
11. Describe the topic via the broker endpoint or Zookeeper with a 2.8.0+
client library
{noformat}
./kafka/bin/kafka-topics.sh --zookeeper :2181 --describe --topic myTopic
Topic: myTopic TopicId: nI-JQtPwQwGiylMfm8k13w PartitionCount: 3
ReplicationFactor: 2 Configs:
Topic: myTopic Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0
Topic: myTopic Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0{noformat}
Notice the abnormal state the topic is in: ISR is reduced to one single broker
which is claimed to be the leader by the controller (here, broker 0). The
controller believes 0 is the leader because it does not handle the error
response from peer brokers when sending the requests for them to become a
leader or follower of a partition.
12. Verify produce is unavailable.
{noformat}
./kafka/bin/kafka-console-producer.sh —bootstrap-server:9092 —topic myTopic
[2022-08-25 17:52:59,962] ERROR Error when sending message to topic myTopic
with key: null, value: 1 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests
intended only for the leader, this error indicates that the broker is not the
current leader. For requests intended for any replica, this error indicates
that the broker is not a replica of the topic partition.
[2022-08-25 17:52:59,964] WARN [Producer clientId=console-producer] Received
invalid metadata error in produce request on partition myTopic-1 due to
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests
intended only for the leader, this error indicates that the broker is not the
current leader. For requests intended for any replica, this error indicates
that the broker is not a replica of the topic partition.. Going to request
metadata update now
(org.apache.kafka.clients.producer.internals.Sender){noformat}
13. Verify consume is unavailable.
{noformat}
./kafka/bin/kafka-console-consumer.sh —bootstrap-server:9092 —topic myTopic
[2022-08-25 17:53:49,416] DEBUG [Consumer
clientId=consumer-console-consumer-25008-1, groupId=console-consumer-25008]
Received LIST_OFFSETS response from node 0 for request with header
RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6,
clientId=consumer-console-consumer-25008-1, correlationId=31):
ListOffsetsResponseData(throttleTimeMs=0,
topics=[ListOffsetsTopicResponse(name='myTopic',
partitions=[ListOffsetsPartitionResponse(partitionIndex=1, errorCode=75,
oldStyleOffsets=[], timestamp=-1, offset=-1, leaderEpoch=-1),
ListOffsetsPartitionResponse(partitionIndex=2, errorCode=75,
oldStyleOffsets=[], timestamp=-1, offset=-1, leaderEpoch=-1),
ListOffsetsPartitionResponse(partitionIndex=0, errorCode=75,
oldStyleOffsets=[], timestamp=-1, offset=-1, leaderEpoch=-1)])])
(org.apache.kafka.clients.NetworkClient)
[2022-08-25 17:53:49,416] DEBUG [Consumer
clientId=consumer-console-consumer-25008-1, groupId=console-consumer-25008]
Attempt to fetch offsets for partition myTopic-1 failed due to
UNKNOWN_LEADER_EPOCH, retrying.
(org.apache.kafka.clients.consumer.internals.Fetcher)
{noformat}
h4. Follow-up
Currently, the ID of a topic is stored in the znode
/brokers/topics/<topic-name> along with the partition assignment. This is a
natural choice of location, however the overwrite of the znode performed by
“old” admin client destroys information which is later on not recovered by the
cluster.
It would be tempting to think about keeping the topic ID information in one
single place, however each of the following approaches fail:
# Keeping the information in Zookeeper only. But without being stored with the
partition’s data, locally on brokers, there would be no way for a broker to
know that the data associated to the partition belongs to a topic which is
different from that currently referenced by the controller, defeating the
purpose of topic IDs.
# Keeping the information locally on brokers, but ensuring consistency would
then require an extreme level of complexity, if at all possible without the use
of a strongly consistent data store.
Therefore, topic IDs have to be maintained in both location. Given the fact
that this information is immutable, this should not be a problem, except in the
case encountered here. Additionally, note that any client which references the
topic ID will be also impacted by a change in the topic ID upon the
controller’s initiative when such an ID is found as absent.
Since pre-2.8.0 clients from the Kafka client or third-party libraries are
still widely used, it may be worth thinking about how to remediate the problem
for as long as Zookeeper controllers are supported.
One way to prevent destructing the information contained in the topic
assignment znode could potentially be to store it in an out-of-band znode,
although feasability and correctness of that approach needs to be assessed.
Such an approach would likely add a significant level of complexity without
enough added benefit. Preserving consistency (especially if atomic reads/writes
to the two znodes are not possible) can prove to be difficult. Keeping full
backward compatibility with existing pre and post 2.8.0 clients, and after
version upgrades, would add to the obstacles.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)