Ryan Leslie created KAFKA-12257:
-----------------------------------

             Summary:  Consumer mishandles topics deleted and recreated with 
the same name
                 Key: KAFKA-12257
                 URL: https://issues.apache.org/jira/browse/KAFKA-12257
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 2.2.0
            Reporter: Ryan Leslie


In KAFKA-7738, caching of leader epochs (KIP-320) was added to o.a.k.c.Metadata 
to ignore metadata responses with epochs smaller than the last seen epoch.

The current implementation can cause problems in cases where a consumer is 
subscribed to a topic that has been deleted and then recreated with the same 
name. This is something seen more often in consumers that subscribe to a 
multitude of topics using a wildcard.

Currently, when a topic is deleted and the Fetcher receives 
UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later time 
while the consumer is still running a topic is created with the same name, the 
leader epochs are set to 0 for the new topics partitions, and are likely 
smaller than those for the previous topic. For example, if a broker had 
restarted during the lifespan of the previous topic, the leader epoch would be 
at least 1 or 2. In this case the metadata will be ignored since it is 
incorrectly considered stale. Of course, the user will sometimes get lucky, and 
if a topic was only recently created so that the epoch is still 0, no problem 
will occur on recreation .The issue is also not seen when consumers happen to 
have been restarted in between deletion and recreation.

The most common side effect of the new metadata being disregarded is that the 
new partitions end up assigned but the Fetcher is unable to fetch data because 
it does not know the leaders. When recreating a topic with the same name it is 
likely that the partition leaders are not the same as for the previous topic, 
and the number of partitions may even be different. Besides not being able to 
retrieve data for the new topic, there is a more sinister side effect of the 
Fetcher triggering a metadata update after the fetch fails. The subsequent 
update will again ignore the topic's metadata if the leader epoch is still 
smaller than
 the cached value. This metadata refresh loop can continue indefinitely and 
with a sufficient number of consumers may even put a strain on a cluster since 
the requests are occurring in a tight loop. This can also be hard for clients 
to identify since there is nothing logged by default that would indicate what's 
happening. Both the Metadata class's logging of "_Not replacing existing 
epoch_", and the Fetcher's logging of "L_eader for partition <T-P> is unknown_" 
are at DEBUG level.

A second possible side effect was observed where if the consumer is acting as 
leader of the group and happens to not have any current data for the previous 
topic, e.g. it was cleared
 due to a metadata error from a broker failure, then the new topic's partitions 
may simply end up unassigned within the group. This is because while the 
subscription list contains the recreated topic the metadata for it was 
previously ignored due to the leader epochs. In this case the user would see 
logs such as:
{noformat}
WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
groupId=myGroup] The following subscribed topics are not assigned to any 
members: [myTopic]{noformat}
Interestingly, I believe the Producer is less affected by this problem since 
ProducerMetadata explicitly clears knowledge of its topics in retainTopics() 
after each metadata expiration. ConsumerMetadata does no such thing.

To reproduce this issue:
 # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
org.apache.kafka.clients.Metadata
 # Begin a consumer for a topic (or multiple topics)
 # Restart a broker that happens to be a leader for one of the topic's 
partitions
 # Delete the topic
 # Create another topic with the same name
 # Publish data for the new topic
 # The consumer will not receive data for the new topic, and the issue will 
only be corrected by restarting the consumer or restarting brokers until leader 
epochs are large enough

I believe KIP-516 (unique topic ids) will likely fix this problem, since after 
those changes the leader epoch map should be keyed off of the topic id, rather 
than the name.

One possible workaround with the current version of Kafka is to add code to 
onPartitionsRevoked() to manually clear leader epochs before each rebalance, 
e.g.
{code:java}
Map<TopicPartition, Integer> emptyLeaderEpochs = new HashMap<>();
ConsumerMetadata metadata = (ConsumerMetadata)FieldUtils.readField(consumer, 
"metadata", 
true);
FieldUtils.writeField(metadata, "lastSeenLeaderEpochs", emptyLeaderEpochs, 
true);{code}
This is not really recommended of course, since besides modifying private 
consumer state, it defeats the purpose of epochs! It does in a sense revert the 
consumer to pre-2.2 behavior before leader epochs existed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to