[
https://issues.apache.org/jira/browse/KAFKA-7096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610096#comment-16610096
]
ASF GitHub Bot commented on KAFKA-7096:
---
lindong28 closed pull request #5289: KAFKA-7096 : Clear buffered data for
unassigned topicPartitions
URL: https://github.com/apache/kafka/pull/5289
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4ea3cfd295f..4cdc4f862ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -918,7 +918,7 @@ public void subscribe(Collection topics,
ConsumerRebalanceListener liste
}
throwIfNoAssignorsConfigured();
-
+fetcher.clearBufferedDataForUnassignedTopics(topics);
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ",
"));
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
@@ -1019,10 +1019,11 @@ public void subscribe(Pattern pattern) {
public void unsubscribe() {
acquireAndEnsureOpen();
try {
-log.info("Unsubscribed all topics or patterns and assigned
partitions");
+
fetcher.clearBufferedDataForUnassignedPartitions(Collections.EMPTY_SET);
this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);
+log.info("Unsubscribed all topics or patterns and assigned
partitions");
} finally {
release();
}
@@ -1063,6 +1064,7 @@ public void assign(Collection partitions)
{
throw new IllegalArgumentException("Topic partitions
to assign to cannot have null or empty topic");
topics.add(topic);
}
+fetcher.clearBufferedDataForUnassignedPartitions(partitions);
// make sure the offsets of topic partitions the consumer is
unsubscribing from
// are committed since there will be no following rebalance
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dc0daa233ab..a92f57e7347 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1015,6 +1015,40 @@ public void onAssignment(Set assignment)
{
sensors.updatePartitionLagAndLeadSensors(assignment);
}
+/**
+ * Clear the buffered data which are not a part of newly assigned
partitions
+ *
+ * @param assignedPartitions newly assigned {@link TopicPartition}
+ */
+public void
clearBufferedDataForUnassignedPartitions(Collection
assignedPartitions) {
+Iterator itr = completedFetches.iterator();
+while (itr.hasNext()) {
+TopicPartition tp = itr.next().partition;
+if (!assignedPartitions.contains(tp)) {
+itr.remove();
+}
+}
+if (nextInLineRecords != null &&
!assignedPartitions.contains(nextInLineRecords.partition)) {
+nextInLineRecords.drain();
+nextInLineRecords = null;
+}
+}
+
+/**
+ * Clear the buffered data which are not a part of newly assigned topics
+ *
+ * @param assignedTopics newly assigned topics
+ */
+public void clearBufferedDataForUnassignedTopics(Collection
assignedTopics) {
+Set currentTopicPartitions = new HashSet<>();
+for (TopicPartition tp : subscriptions.assignedPartitions()) {
+if (assignedTopics.contains(tp.topic())) {
+currentTopicPartitions.add(tp);
+}
+}
+clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
+}
+
public static Sensor throttleTimeSensor(Metrics metrics,
FetcherMetricsRegistry metricsRegistry) {
Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg),
new Avg());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d314a4d8c9f..afe5b2fa