[jira] [Commented] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions

2018-09-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610096#comment-16610096
 ] 

ASF GitHub Bot commented on KAFKA-7096:
---

lindong28 closed pull request #5289: KAFKA-7096 : Clear buffered data for 
unassigned topicPartitions
URL: https://github.com/apache/kafka/pull/5289
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4ea3cfd295f..4cdc4f862ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -918,7 +918,7 @@ public void subscribe(Collection topics, 
ConsumerRebalanceListener liste
 }
 
 throwIfNoAssignorsConfigured();
-
+fetcher.clearBufferedDataForUnassignedTopics(topics);
 log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", 
"));
 this.subscriptions.subscribe(new HashSet<>(topics), listener);
 metadata.setTopics(subscriptions.groupSubscription());
@@ -1019,10 +1019,11 @@ public void subscribe(Pattern pattern) {
 public void unsubscribe() {
 acquireAndEnsureOpen();
 try {
-log.info("Unsubscribed all topics or patterns and assigned 
partitions");
+
fetcher.clearBufferedDataForUnassignedPartitions(Collections.EMPTY_SET);
 this.subscriptions.unsubscribe();
 this.coordinator.maybeLeaveGroup();
 this.metadata.needMetadataForAllTopics(false);
+log.info("Unsubscribed all topics or patterns and assigned 
partitions");
 } finally {
 release();
 }
@@ -1063,6 +1064,7 @@ public void assign(Collection partitions) 
{
 throw new IllegalArgumentException("Topic partitions 
to assign to cannot have null or empty topic");
 topics.add(topic);
 }
+fetcher.clearBufferedDataForUnassignedPartitions(partitions);
 
 // make sure the offsets of topic partitions the consumer is 
unsubscribing from
 // are committed since there will be no following rebalance
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dc0daa233ab..a92f57e7347 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1015,6 +1015,40 @@ public void onAssignment(Set assignment) 
{
 sensors.updatePartitionLagAndLeadSensors(assignment);
 }
 
+/**
+ * Clear the buffered data which are not a part of newly assigned 
partitions
+ *
+ * @param assignedPartitions  newly assigned {@link TopicPartition}
+ */
+public void 
clearBufferedDataForUnassignedPartitions(Collection 
assignedPartitions) {
+Iterator itr = completedFetches.iterator();
+while (itr.hasNext()) {
+TopicPartition tp = itr.next().partition;
+if (!assignedPartitions.contains(tp)) {
+itr.remove();
+}
+}
+if (nextInLineRecords != null && 
!assignedPartitions.contains(nextInLineRecords.partition)) {
+nextInLineRecords.drain();
+nextInLineRecords = null;
+}
+}
+
+/**
+ * Clear the buffered data which are not a part of newly assigned topics
+ *
+ * @param assignedTopics  newly assigned topics
+ */
+public void clearBufferedDataForUnassignedTopics(Collection 
assignedTopics) {
+Set currentTopicPartitions = new HashSet<>();
+for (TopicPartition tp : subscriptions.assignedPartitions()) {
+if (assignedTopics.contains(tp.topic())) {
+currentTopicPartitions.add(tp);
+}
+}
+clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
+}
+
 public static Sensor throttleTimeSensor(Metrics metrics, 
FetcherMetricsRegistry metricsRegistry) {
 Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
 
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg),
 new Avg());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d314a4d8c9f..afe5b2fa

[jira] [Commented] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions

2018-06-25 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522768#comment-16522768
 ] 

Mayuresh Gharat commented on KAFKA-7096:


[~lindong] : [https://github.com/apache/kafka/pull/5289]

 

> Consumer should drop the data for unassigned topic partitions
> -
>
> Key: KAFKA-7096
> URL: https://issues.apache.org/jira/browse/KAFKA-7096
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> currently if a client has assigned topics : T1, T2, T3 and calls poll(), the 
> poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the 
> client unassigns some topics (for example T3) and calls poll() we still hold 
> the data (for T3) in the completedFetches queue until we actually reach the 
> buffered data for the unassigned Topics (T3 in our example) on subsequent 
> poll() calls, at which point we drop that data. This process of holding the 
> data is unnecessary.
> When a client creates a topic, it takes time for the broker to fetch ACLs for 
> the topic. But during this time, the client will issue fetchRequest for the 
> topic, it will get response for the partitions of this topic. The response 
> consist of TopicAuthorizationException for each of the partitions. This 
> response for each partition is wrapped with a completedFetch and added to the 
> completedFetches queue. Now when the client calls the next poll() it sees the 
> TopicAuthorizationException from the first buffered CompletedFetch. At this 
> point the client chooses to sleep for 1.5 min as a backoff (as per the 
> design), hoping that the Broker fetches the ACL from ACL store in the 
> meantime. Actually the Broker has already fetched the ACL by this time. When 
> the client calls poll() after the sleep, it again sees the 
> TopicAuthorizationException from the second completedFetch and it sleeps 
> again. So it takes (1.5 * 60 * partitions) seconds before the client can see 
> any data. With this patch, the client when it sees the first 
> TopicAuthorizationException, it can all assign(EmptySet), which will get rid 
> of the buffered completedFetches (those with TopicAuthorizationException) and 
> it can again call assign(TopicPartitions) before calling poll(). With this 
> patch we found that client was able to get the records as soon as the Broker 
> fetched the ACLs from ACL store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)