Mayuresh Gharat created KAFKA-7096:
--------------------------------------
Summary: Consumer should drop the data for unassigned topic
partitions
Key: KAFKA-7096
URL: https://issues.apache.org/jira/browse/KAFKA-7096
Project: Kafka
Issue Type: Improvement
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat
currently if a client has assigned topics : T1, T2, T3 and calls poll(), the
poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the
client unassigns some topics (for example T3) and calls poll() we still hold
the data (for T3) in the completedFetches queue until we actually reach the
buffered data for the unassigned Topics (T3 in our example) on subsequent
poll() calls, at which point we drop that data. This process of holding the
data is unnecessary.
When a client creates a topic, it takes time for the broker to fetch ACLs for
the topic. But during this time, the client will issue fetchRequest for the
topic, it will get response for the partitions of this topic. The response
consist of TopicAuthorizationException for each of the partitions. This
response for each partition is wrapped with a completedFetch and added to the
completedFetches queue. Now when the client calls the next poll() it sees the
TopicAuthorizationException from the first buffered CompletedFetch. At this
point the client chooses to sleep for 1.5 min as a backoff (as per the design),
hoping that the Broker fetches the ACL from ACL store in the meantime. Actually
the Broker has already fetched the ACL by this time. When the client calls
poll() after the sleep, it again sees the TopicAuthorizationException from the
second completedFetch and it sleeps again. So it takes (1.5 * 60 * partitions)
seconds before the client can see any data. With this patch, the client when it
sees the first TopicAuthorizationException, it can all assign(EmptySet), which
will get rid of the buffered completedFetches (those with
TopicAuthorizationException) and it can again call assign(TopicPartitions)
before calling poll(). With this patch we found that client was able to get the
records as soon as the Broker fetched the ACLs from ACL store.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)