This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 572a57c62e6d78b371605e3731e48a1dd18d38d4
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Thu Feb 28 15:47:02 2019 +0800

    [FLINK-10342] [kafka] Improve Javadoc of new 
disableFilterRestoredPartitionsWithSubscribedTopics method in 
FlinkKafkaConsumerBase
    
    This closes #7726.
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java   | 28 ++++++++++++----------
 .../kafka/FlinkKafkaConsumerBaseTest.java          |  2 +-
 2 files changed, 16 insertions(+), 14 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 57198e5..6cf1839 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -140,12 +140,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        private boolean enableCommitOnCheckpoints = true;
 
        /**
-        * User-set flag to disable filter restored partitions with current
-        * discovered partitions. It's enabled by default since otherwise will 
result in
-        * unexpected behaviors - e.g. When changing the topic name, or remove 
some topics,
-        * The removed/renamed partitions will be still consumed.
+        * User-set flag to disable filtering restored partitions with current 
topics descriptor.
         */
-       private boolean filterRestoredPartitionsWithDiscovered = true;
+       private boolean filterRestoredPartitionsWithCurrentTopicsDescriptor = 
true;
 
        /**
         * The offset commit mode for the consumer.
@@ -470,15 +467,20 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                return this;
        }
 
-       /* Disable filtering the restored partitions with discovered partitions.
-
-        * Note: this may result in un-wanted behaviors: e.g. When changing the
-        * topic name, or remove some topics, the removed/renamed partitions
-        * will be still consumed.
+       /**
+        * By default, when restoring from a checkpoint / savepoint, the 
consumer always
+        * ignores restored partitions that are no longer associated with the 
current specified topics or
+        * topic pattern to subscribe to.
+        *
+        * <p>This method configures the consumer to not filter the restored 
partitions,
+        * therefore always attempting to consume whatever partition was 
present in the
+        * previous execution regardless of the specified topics to subscribe 
to in the
+        * current execution.
+        *
         * @return The consumer object, to allow function chaining.
         */
-       public FlinkKafkaConsumerBase<T> 
disableFilterRestoredPartitionsWithDiscovered() {
-               this.filterRestoredPartitionsWithDiscovered = false;
+       public FlinkKafkaConsumerBase<T> 
disableFilterRestoredPartitionsWithSubscribedTopics() {
+               this.filterRestoredPartitionsWithCurrentTopicsDescriptor = 
false;
                return this;
        }
 
@@ -526,7 +528,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                }
                        }
 
-                       if (filterRestoredPartitionsWithDiscovered) {
+                       if 
(filterRestoredPartitionsWithCurrentTopicsDescriptor) {
                                
subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
                                        if 
(!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
                                                LOG.warn(
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index a5625bd..b578dac 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -350,7 +350,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
                                        .collect(Collectors.toList())));
                final FlinkKafkaConsumerBase<String> consumer = new 
DummyFlinkKafkaConsumer<>(initKafkaTopics, discoverer);
                if (disableFiltering) {
-                       
consumer.disableFilterRestoredPartitionsWithDiscovered();
+                       
consumer.disableFilterRestoredPartitionsWithSubscribedTopics();
                }
 
                final TestingListState<Tuple2<KafkaTopicPartition, Long>> 
listState = new TestingListState<>();

Reply via email to