This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6a4f0124512a424510f1dcdbe7855c92617e78c6 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Thu Feb 28 15:47:02 2019 +0800 [FLINK-10342] [kafka] Improve Javadoc of new disableFilterRestoredPartitionsWithSubscribedTopics method in FlinkKafkaConsumerBase This closes #7726. --- .../connectors/kafka/FlinkKafkaConsumerBase.java | 28 ++++++++++++---------- .../kafka/FlinkKafkaConsumerBaseTest.java | 2 +- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 57198e5..6cf1839 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -140,12 +140,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti private boolean enableCommitOnCheckpoints = true; /** - * User-set flag to disable filter restored partitions with current - * discovered partitions. It's enabled by default since otherwise will result in - * unexpected behaviors - e.g. When changing the topic name, or remove some topics, - * The removed/renamed partitions will be still consumed. + * User-set flag to disable filtering restored partitions with current topics descriptor. */ - private boolean filterRestoredPartitionsWithDiscovered = true; + private boolean filterRestoredPartitionsWithCurrentTopicsDescriptor = true; /** * The offset commit mode for the consumer. @@ -470,15 +467,20 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti return this; } - /* Disable filtering the restored partitions with discovered partitions. - - * Note: this may result in un-wanted behaviors: e.g. When changing the - * topic name, or remove some topics, the removed/renamed partitions - * will be still consumed. + /** + * By default, when restoring from a checkpoint / savepoint, the consumer always + * ignores restored partitions that are no longer associated with the current specified topics or + * topic pattern to subscribe to. + * + * <p>This method configures the consumer to not filter the restored partitions, + * therefore always attempting to consume whatever partition was present in the + * previous execution regardless of the specified topics to subscribe to in the + * current execution. + * * @return The consumer object, to allow function chaining. */ - public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithDiscovered() { - this.filterRestoredPartitionsWithDiscovered = false; + public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithSubscribedTopics() { + this.filterRestoredPartitionsWithCurrentTopicsDescriptor = false; return this; } @@ -526,7 +528,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti } } - if (filterRestoredPartitionsWithDiscovered) { + if (filterRestoredPartitionsWithCurrentTopicsDescriptor) { subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> { if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) { LOG.warn( diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index a5625bd..b578dac 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -350,7 +350,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { .collect(Collectors.toList()))); final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(initKafkaTopics, discoverer); if (disableFiltering) { - consumer.disableFilterRestoredPartitionsWithDiscovered(); + consumer.disableFilterRestoredPartitionsWithSubscribedTopics(); } final TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();