This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 879507d2a7553a60b361519941915c6645e9559c
Author: okidogi <okido...@gmail.com>
AuthorDate: Sun Feb 17 22:08:14 2019 +0100

    [FLINK-10342] [kafka] Filter restored partitions in FlinkKafkaConsumer with 
topics descriptor
    
    This commit lets the FlinkKafkaConsumer filter out restoreed partition's
    offsets that are no longer associated with the current list of specific
    topics / topic pattern to subscribe to.
    
    This changes the previous default behaviour of the FlinkKafkaConsumer,
    which always respected the complete list of restored partitions,
    regardless of the specified topics to subscribe to.
    As a fallback, a setter configuration method is added to allow disabling
    the filter behavior.
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java   |  32 ++++
 .../internals/AbstractPartitionDiscoverer.java     |   2 +-
 .../kafka/internals/KafkaTopicsDescriptor.java     |  17 ++-
 .../kafka/FlinkKafkaConsumerBaseMigrationTest.java |  23 ++-
 .../kafka/FlinkKafkaConsumerBaseTest.java          | 170 +++++++++++++++++++--
 .../kafka/internals/KafkaTopicsDescriptorTest.java |  64 ++++++++
 6 files changed, 285 insertions(+), 23 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index ad9fc4f..57198e5 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -140,6 +140,14 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        private boolean enableCommitOnCheckpoints = true;
 
        /**
+        * User-set flag to disable filter restored partitions with current
+        * discovered partitions. It's enabled by default since otherwise will 
result in
+        * unexpected behaviors - e.g. When changing the topic name, or remove 
some topics,
+        * The removed/renamed partitions will be still consumed.
+        */
+       private boolean filterRestoredPartitionsWithDiscovered = true;
+
+       /**
         * The offset commit mode for the consumer.
         * The value of this can only be determined in {@link 
FlinkKafkaConsumerBase#open(Configuration)} since it depends
         * on whether or not checkpointing is enabled for the job.
@@ -462,6 +470,18 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                return this;
        }
 
+       /* Disable filtering the restored partitions with discovered partitions.
+
+        * Note: this may result in un-wanted behaviors: e.g. When changing the
+        * topic name, or remove some topics, the removed/renamed partitions
+        * will be still consumed.
+        * @return The consumer object, to allow function chaining.
+        */
+       public FlinkKafkaConsumerBase<T> 
disableFilterRestoredPartitionsWithDiscovered() {
+               this.filterRestoredPartitionsWithDiscovered = false;
+               return this;
+       }
+
        // 
------------------------------------------------------------------------
        //  Work methods
        // 
------------------------------------------------------------------------
@@ -506,6 +526,18 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                }
                        }
 
+                       if (filterRestoredPartitionsWithDiscovered) {
+                               
subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
+                                       if 
(!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
+                                               LOG.warn(
+                                                       "{} is removed from 
subscribed partitions since it is no longer associated with topics descriptor 
of current execution.",
+                                                       entry.getKey());
+                                               return true;
+                                       }
+                                       return false;
+                               });
+                       }
+
                        LOG.info("Consumer subtask {} will start reading {} 
partitions with offsets in restored state: {}",
                                getRuntimeContext().getIndexOfThisSubtask(), 
subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
                } else {
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
index cca24b7..2caae73 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
@@ -135,7 +135,7 @@ public abstract class AbstractPartitionDiscoverer {
                                        // retain topics that match the pattern
                                        Iterator<String> iter = 
matchedTopics.iterator();
                                        while (iter.hasNext()) {
-                                               if 
(!topicsDescriptor.getTopicPattern().matcher(iter.next()).matches()) {
+                                               if 
(!topicsDescriptor.isMatchingTopic(iter.next())) {
                                                        iter.remove();
                                                }
                                        }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
index ddea63b..2fa7c28 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
@@ -60,12 +60,21 @@ public class KafkaTopicsDescriptor implements Serializable {
                return topicPattern != null;
        }
 
-       public List<String> getFixedTopics() {
-               return fixedTopics;
+       /**
+        * Check if the input topic matches the topics described by this 
KafkaTopicDescriptor.
+        *
+        * @return true if found a match.
+        */
+       public boolean isMatchingTopic(String topic) {
+               if (isFixedTopics()) {
+                       return getFixedTopics().contains(topic);
+               } else {
+                       return topicPattern.matcher(topic).matches();
+               }
        }
 
-       public Pattern getTopicPattern() {
-               return topicPattern;
+       public List<String> getFixedTopics() {
+               return fixedTopics;
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 6d8f1fa..9284c6d 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -54,6 +54,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -88,6 +89,12 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                PARTITION_STATE.put(new KafkaTopicPartition("def", 7), 
987654321L);
        }
 
+       private static final List<String> TOPICS = new 
ArrayList<>(PARTITION_STATE.keySet())
+               .stream()
+               .map(p -> p.getTopic())
+               .distinct()
+               .collect(Collectors.toList());
+
        private final MigrationVersion testMigrateVersion;
 
        @Parameterized.Parameters(name = "Migration Savepoint: {0}")
@@ -135,7 +142,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                final List<KafkaTopicPartition> partitions = new 
ArrayList<>(PARTITION_STATE.keySet());
 
                final DummyFlinkKafkaConsumer<String> consumerFunction =
-                       new DummyFlinkKafkaConsumer<>(fetcher, partitions, 
FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
+                       new DummyFlinkKafkaConsumer<>(fetcher, TOPICS, 
partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
 
                StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
                                new StreamSource<>(consumerFunction);
@@ -192,6 +199,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
        public void testRestoreFromEmptyStateNoPartitions() throws Exception {
                final DummyFlinkKafkaConsumer<String> consumerFunction =
                                new DummyFlinkKafkaConsumer<>(
+                                       
Collections.singletonList("dummy-topic"),
                                        
Collections.<KafkaTopicPartition>emptyList(),
                                        
FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
 
@@ -231,7 +239,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                final List<KafkaTopicPartition> partitions = new 
ArrayList<>(PARTITION_STATE.keySet());
 
                final DummyFlinkKafkaConsumer<String> consumerFunction =
-                       new DummyFlinkKafkaConsumer<>(partitions, 
FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
+                       new DummyFlinkKafkaConsumer<>(TOPICS, partitions, 
FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
 
                StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
                                new StreamSource<>(consumerFunction);
@@ -283,7 +291,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                final List<KafkaTopicPartition> partitions = new 
ArrayList<>(PARTITION_STATE.keySet());
 
                final DummyFlinkKafkaConsumer<String> consumerFunction =
-                       new DummyFlinkKafkaConsumer<>(partitions, 
FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
+                       new DummyFlinkKafkaConsumer<>(TOPICS, partitions, 
FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
 
                StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
                                new StreamSource<>(consumerFunction);
@@ -327,7 +335,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                final List<KafkaTopicPartition> partitions = new 
ArrayList<>(PARTITION_STATE.keySet());
 
                final DummyFlinkKafkaConsumer<String> consumerFunction =
-                       new DummyFlinkKafkaConsumer<>(partitions, 1000L); // 
discovery enabled
+                       new DummyFlinkKafkaConsumer<>(TOPICS, partitions, 
1000L); // discovery enabled
 
                StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
                        new StreamSource<>(consumerFunction);
@@ -363,11 +371,12 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                @SuppressWarnings("unchecked")
                DummyFlinkKafkaConsumer(
                                AbstractFetcher<T, ?> fetcher,
+                               List<String> topics,
                                List<KafkaTopicPartition> partitions,
                                long discoveryInterval) {
 
                        super(
-                               Arrays.asList("dummy-topic"),
+                               topics,
                                null,
                                (KafkaDeserializationSchema< T >) 
mock(KafkaDeserializationSchema.class),
                                discoveryInterval,
@@ -377,8 +386,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                        this.partitions = partitions;
                }
 
-               DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions, 
long discoveryInterval) {
-                       this(mock(AbstractFetcher.class), partitions, 
discoveryInterval);
+               DummyFlinkKafkaConsumer(List<String> topics, 
List<KafkaTopicPartition> partitions, long discoveryInterval) {
+                       this(mock(AbstractFetcher.class), topics, partitions, 
discoveryInterval);
                }
 
                @Override
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index c49c852..a5625bd 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -76,6 +77,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.everyItem;
@@ -251,6 +253,124 @@ public class FlinkKafkaConsumerBaseTest extends 
TestLogger {
                assertEquals(OffsetCommitMode.DISABLED, 
consumer.getOffsetCommitMode());
        }
 
+       /**
+        * Tests that subscribed partitions didn't change when there's no change
+        * on the intial topics. (filterRestoredPartitionsWithDiscovered is 
active)
+        */
+       @Test
+       public void testSetFilterRestoredParitionsNoChange() throws Exception {
+               checkFilterRestoredPartitionsWithDisovered(
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       false);
+       }
+
+       /**
+        * Tests that removed partitions will be removed from subscribed 
partitions
+        * Even if it's still in restored partitions.
+        * (filterRestoredPartitionsWithDiscovered is active)
+        */
+       @Test
+       public void testSetFilterRestoredParitionsWithRemovedTopic() throws 
Exception {
+               checkFilterRestoredPartitionsWithDisovered(
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       Arrays.asList(new String[]{"kafka_topic_1"}),
+                       Arrays.asList(new String[]{"kafka_topic_1"}),
+                       false);
+       }
+
+       /**
+        * Tests that newly added partitions will be added to subscribed 
partitions.
+        * (filterRestoredPartitionsWithDiscovered is active)
+        */
+       @Test
+       public void testSetFilterRestoredParitionsWithAddedTopic() throws 
Exception {
+               checkFilterRestoredPartitionsWithDisovered(
+                       Arrays.asList(new String[]{"kafka_topic_1"}),
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       false);
+       }
+
+       /**
+        * Tests that subscribed partitions are the same when there's no
+        * change on the intial topics.
+        * (filterRestoredPartitionsWithDiscovered is disabled)
+        */
+       @Test
+       public void testDisableFilterRestoredParitionsNoChange() throws 
Exception {
+               checkFilterRestoredPartitionsWithDisovered(
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       true);
+       }
+
+       /**
+        * Tests that removed partitions will not be removed from subscribed 
partitions
+        * Even if it's still in restored partitions.
+        * (filterRestoredPartitionsWithDiscovered is disabled)
+        */
+       @Test
+       public void testDisableFilterRestoredParitionsWithRemovedTopic() throws 
Exception {
+               checkFilterRestoredPartitionsWithDisovered(
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       Arrays.asList(new String[]{"kafka_topic_1"}),
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       true);
+       }
+
+       /**
+        * Tests that newly added partitions will be added to subscribed 
partitions.
+        * (filterRestoredPartitionsWithDiscovered is disabled)
+        */
+       @Test
+       public void testDisableFilterRestoredParitionsWithAddedTopic() throws 
Exception {
+               checkFilterRestoredPartitionsWithDisovered(
+                       Arrays.asList(new String[]{"kafka_topic_1"}),
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       Arrays.asList(new String[]{"kafka_topic_1", 
"kafka_topic_2"}),
+                       true);
+       }
+
+       private void checkFilterRestoredPartitionsWithDisovered(
+                       List<String> restoredKafkaTopics,
+                       List<String> initKafkaTopics,
+                       List<String> expectedSubscribedPartitions,
+                       Boolean disableFiltering) throws Exception {
+               final AbstractPartitionDiscoverer discoverer = new 
TestPartitionDiscoverer(
+                       new KafkaTopicsDescriptor(initKafkaTopics, null),
+                       0,
+                       1,
+                       
TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(initKafkaTopics),
+                       
TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(
+                               initKafkaTopics.stream()
+                                       .map(topic -> new 
KafkaTopicPartition(topic, 0))
+                                       .collect(Collectors.toList())));
+               final FlinkKafkaConsumerBase<String> consumer = new 
DummyFlinkKafkaConsumer<>(initKafkaTopics, discoverer);
+               if (disableFiltering) {
+                       
consumer.disableFilterRestoredPartitionsWithDiscovered();
+               }
+
+               final TestingListState<Tuple2<KafkaTopicPartition, Long>> 
listState = new TestingListState<>();
+
+               for (int i = 0; i < restoredKafkaTopics.size(); i++) {
+                       listState.add(new Tuple2<>(new 
KafkaTopicPartition(restoredKafkaTopics.get(i), 0), 12345L));
+               }
+
+               setupConsumer(consumer, true, listState, true, 0, 1);
+
+               Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets = 
consumer.getSubscribedPartitionsToStartOffsets();
+
+               assertEquals(new HashSet<>(expectedSubscribedPartitions),
+                       subscribedPartitionsToStartOffsets
+                               .keySet()
+                               .stream()
+                               .map(partition -> partition.getTopic())
+                               .collect(Collectors.toSet()));
+       }
+
        @Test
        @SuppressWarnings("unchecked")
        public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws 
Exception {
@@ -585,15 +705,17 @@ public class FlinkKafkaConsumerBaseTest extends 
TestLogger {
                AbstractStreamOperatorTestHarness<String>[] testHarnesses =
                        new 
AbstractStreamOperatorTestHarness[initialParallelism];
 
+               List<String> testTopics = 
Collections.singletonList("test-topic");
+
                for (int i = 0; i < initialParallelism; i++) {
                        TestPartitionDiscoverer partitionDiscoverer = new 
TestPartitionDiscoverer(
-                               new 
KafkaTopicsDescriptor(Collections.singletonList("test-topic"), null),
+                               new KafkaTopicsDescriptor(testTopics, null),
                                i,
                                initialParallelism,
-                               
TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList("test-topic")),
+                               
TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(testTopics),
                                
TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockFetchedPartitionsOnStartup));
 
-                       consumers[i] = new 
DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), partitionDiscoverer, 
false);
+                       consumers[i] = new 
DummyFlinkKafkaConsumer<>(testTopics, partitionDiscoverer);
                        testHarnesses[i] = createTestHarness(consumers[i], 
initialParallelism, i);
 
                        // initializeState() is always called, null signals 
that we didn't restore
@@ -644,13 +766,13 @@ public class FlinkKafkaConsumerBaseTest extends 
TestLogger {
                                mergedState, maxParallelism, 
initialParallelism, restoredParallelism, i);
 
                        TestPartitionDiscoverer partitionDiscoverer = new 
TestPartitionDiscoverer(
-                               new 
KafkaTopicsDescriptor(Collections.singletonList("test-topic"), null),
+                               new KafkaTopicsDescriptor(testTopics, null),
                                i,
                                restoredParallelism,
-                               
TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList("test-topic")),
+                               
TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(testTopics),
                                
TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockFetchedPartitionsAfterRestore));
 
-                       restoredConsumers[i] = new 
DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), partitionDiscoverer, 
false);
+                       restoredConsumers[i] = new 
DummyFlinkKafkaConsumer<>(testTopics, partitionDiscoverer);
                        restoredTestHarnesses[i] = 
createTestHarness(restoredConsumers[i], restoredParallelism, i);
 
                        // initializeState() is always called, null signals 
that we didn't restore
@@ -869,6 +991,16 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger 
{
                }
 
                @SuppressWarnings("unchecked")
+               DummyFlinkKafkaConsumer(List<String> topics, 
AbstractPartitionDiscoverer abstractPartitionDiscoverer) {
+                       this(
+                               () -> mock(AbstractFetcher.class),
+                               abstractPartitionDiscoverer,
+                               false,
+                               PARTITION_DISCOVERY_DISABLED,
+                               topics);
+               }
+
+               @SuppressWarnings("unchecked")
                
DummyFlinkKafkaConsumer(SupplierWithException<AbstractFetcher<T, ?>, Exception> 
abstractFetcherSupplier, AbstractPartitionDiscoverer 
abstractPartitionDiscoverer, long discoveryIntervalMillis) {
                        this(abstractFetcherSupplier, 
abstractPartitionDiscoverer, false, discoveryIntervalMillis);
                }
@@ -904,13 +1036,29 @@ public class FlinkKafkaConsumerBaseTest extends 
TestLogger {
                                AbstractPartitionDiscoverer 
testPartitionDiscoverer,
                                boolean isAutoCommitEnabled,
                                long discoveryIntervalMillis) {
+                       this(
+                               testFetcherSupplier,
+                               testPartitionDiscoverer,
+                               isAutoCommitEnabled,
+                               discoveryIntervalMillis,
+                               Collections.singletonList("dummy-topic")
+                               );
+               }
+
+               @SuppressWarnings("unchecked")
+               DummyFlinkKafkaConsumer(
+                       SupplierWithException<AbstractFetcher<T, ?>, Exception> 
testFetcherSupplier,
+                       AbstractPartitionDiscoverer testPartitionDiscoverer,
+                       boolean isAutoCommitEnabled,
+                       long discoveryIntervalMillis,
+                       List<String> topics) {
 
                        super(
-                                       
Collections.singletonList("dummy-topic"),
-                                       null,
-                                       (KafkaDeserializationSchema< T >) 
mock(KafkaDeserializationSchema.class),
-                                       discoveryIntervalMillis,
-                                       false);
+                               topics,
+                               null,
+                               (KeyedDeserializationSchema< T >) 
mock(KeyedDeserializationSchema.class),
+                               discoveryIntervalMillis,
+                               false);
 
                        this.testFetcherSupplier = testFetcherSupplier;
                        this.testPartitionDiscoverer = testPartitionDiscoverer;
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptorTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptorTest.java
new file mode 100644
index 0000000..fa3faec
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Tests for the {@link KafkaTopicsDescriptor}.
+ */
+@RunWith(Parameterized.class)
+public class KafkaTopicsDescriptorTest {
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] {
+                       { "topic1", null, Arrays.asList("topic1", "topic2", 
"topic3"), true },
+                       { "topic1", null, Arrays.asList("topic2", "topic3"), 
false },
+                       { "topic1", Pattern.compile("topic[0-9]"), null, true },
+                       { "topicx", Pattern.compile("topic[0-9]"), null, false }
+               });
+       }
+
+       private String topic;
+       private Pattern topicPattern;
+       private List<String> fixedTopics;
+       boolean expected;
+
+       public KafkaTopicsDescriptorTest(String topic, Pattern topicPattern, 
List<String> fixedTopics, boolean expected) {
+               this.topic = topic;
+               this.topicPattern = topicPattern;
+               this.fixedTopics = fixedTopics;
+               this.expected = expected;
+       }
+
+       @Test
+       public void testIsMatchingTopic() {
+               KafkaTopicsDescriptor topicsDescriptor = new 
KafkaTopicsDescriptor(fixedTopics, topicPattern);
+
+               Assert.assertEquals(expected, 
topicsDescriptor.isMatchingTopic(topic));
+       }
+}

Reply via email to