This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 879507d2a7553a60b361519941915c6645e9559c Author: okidogi <okido...@gmail.com> AuthorDate: Sun Feb 17 22:08:14 2019 +0100 [FLINK-10342] [kafka] Filter restored partitions in FlinkKafkaConsumer with topics descriptor This commit lets the FlinkKafkaConsumer filter out restoreed partition's offsets that are no longer associated with the current list of specific topics / topic pattern to subscribe to. This changes the previous default behaviour of the FlinkKafkaConsumer, which always respected the complete list of restored partitions, regardless of the specified topics to subscribe to. As a fallback, a setter configuration method is added to allow disabling the filter behavior. --- .../connectors/kafka/FlinkKafkaConsumerBase.java | 32 ++++ .../internals/AbstractPartitionDiscoverer.java | 2 +- .../kafka/internals/KafkaTopicsDescriptor.java | 17 ++- .../kafka/FlinkKafkaConsumerBaseMigrationTest.java | 23 ++- .../kafka/FlinkKafkaConsumerBaseTest.java | 170 +++++++++++++++++++-- .../kafka/internals/KafkaTopicsDescriptorTest.java | 64 ++++++++ 6 files changed, 285 insertions(+), 23 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index ad9fc4f..57198e5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -140,6 +140,14 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti private boolean enableCommitOnCheckpoints = true; /** + * User-set flag to disable filter restored partitions with current + * discovered partitions. It's enabled by default since otherwise will result in + * unexpected behaviors - e.g. When changing the topic name, or remove some topics, + * The removed/renamed partitions will be still consumed. + */ + private boolean filterRestoredPartitionsWithDiscovered = true; + + /** * The offset commit mode for the consumer. * The value of this can only be determined in {@link FlinkKafkaConsumerBase#open(Configuration)} since it depends * on whether or not checkpointing is enabled for the job. @@ -462,6 +470,18 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti return this; } + /* Disable filtering the restored partitions with discovered partitions. + + * Note: this may result in un-wanted behaviors: e.g. When changing the + * topic name, or remove some topics, the removed/renamed partitions + * will be still consumed. + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithDiscovered() { + this.filterRestoredPartitionsWithDiscovered = false; + return this; + } + // ------------------------------------------------------------------------ // Work methods // ------------------------------------------------------------------------ @@ -506,6 +526,18 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti } } + if (filterRestoredPartitionsWithDiscovered) { + subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> { + if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) { + LOG.warn( + "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.", + entry.getKey()); + return true; + } + return false; + }); + } + LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); } else { diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java index cca24b7..2caae73 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java @@ -135,7 +135,7 @@ public abstract class AbstractPartitionDiscoverer { // retain topics that match the pattern Iterator<String> iter = matchedTopics.iterator(); while (iter.hasNext()) { - if (!topicsDescriptor.getTopicPattern().matcher(iter.next()).matches()) { + if (!topicsDescriptor.isMatchingTopic(iter.next())) { iter.remove(); } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java index ddea63b..2fa7c28 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java @@ -60,12 +60,21 @@ public class KafkaTopicsDescriptor implements Serializable { return topicPattern != null; } - public List<String> getFixedTopics() { - return fixedTopics; + /** + * Check if the input topic matches the topics described by this KafkaTopicDescriptor. + * + * @return true if found a match. + */ + public boolean isMatchingTopic(String topic) { + if (isFixedTopics()) { + return getFixedTopics().contains(topic); + } else { + return topicPattern.matcher(topic).matches(); + } } - public Pattern getTopicPattern() { - return topicPattern; + public List<String> getFixedTopics() { + return fixedTopics; } @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index 6d8f1fa..9284c6d 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -54,6 +54,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -88,6 +89,12 @@ public class FlinkKafkaConsumerBaseMigrationTest { PARTITION_STATE.put(new KafkaTopicPartition("def", 7), 987654321L); } + private static final List<String> TOPICS = new ArrayList<>(PARTITION_STATE.keySet()) + .stream() + .map(p -> p.getTopic()) + .distinct() + .collect(Collectors.toList()); + private final MigrationVersion testMigrateVersion; @Parameterized.Parameters(name = "Migration Savepoint: {0}") @@ -135,7 +142,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet()); final DummyFlinkKafkaConsumer<String> consumerFunction = - new DummyFlinkKafkaConsumer<>(fetcher, partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED); + new DummyFlinkKafkaConsumer<>(fetcher, TOPICS, partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED); StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction); @@ -192,6 +199,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { public void testRestoreFromEmptyStateNoPartitions() throws Exception { final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>( + Collections.singletonList("dummy-topic"), Collections.<KafkaTopicPartition>emptyList(), FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED); @@ -231,7 +239,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet()); final DummyFlinkKafkaConsumer<String> consumerFunction = - new DummyFlinkKafkaConsumer<>(partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED); + new DummyFlinkKafkaConsumer<>(TOPICS, partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED); StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction); @@ -283,7 +291,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet()); final DummyFlinkKafkaConsumer<String> consumerFunction = - new DummyFlinkKafkaConsumer<>(partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED); + new DummyFlinkKafkaConsumer<>(TOPICS, partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED); StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction); @@ -327,7 +335,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet()); final DummyFlinkKafkaConsumer<String> consumerFunction = - new DummyFlinkKafkaConsumer<>(partitions, 1000L); // discovery enabled + new DummyFlinkKafkaConsumer<>(TOPICS, partitions, 1000L); // discovery enabled StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction); @@ -363,11 +371,12 @@ public class FlinkKafkaConsumerBaseMigrationTest { @SuppressWarnings("unchecked") DummyFlinkKafkaConsumer( AbstractFetcher<T, ?> fetcher, + List<String> topics, List<KafkaTopicPartition> partitions, long discoveryInterval) { super( - Arrays.asList("dummy-topic"), + topics, null, (KafkaDeserializationSchema< T >) mock(KafkaDeserializationSchema.class), discoveryInterval, @@ -377,8 +386,8 @@ public class FlinkKafkaConsumerBaseMigrationTest { this.partitions = partitions; } - DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions, long discoveryInterval) { - this(mock(AbstractFetcher.class), partitions, discoveryInterval); + DummyFlinkKafkaConsumer(List<String> topics, List<KafkaTopicPartition> partitions, long discoveryInterval) { + this(mock(AbstractFetcher.class), topics, partitions, discoveryInterval); } @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index c49c852..a5625bd 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -51,6 +51,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -76,6 +77,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkState; import static org.hamcrest.Matchers.everyItem; @@ -251,6 +253,124 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { assertEquals(OffsetCommitMode.DISABLED, consumer.getOffsetCommitMode()); } + /** + * Tests that subscribed partitions didn't change when there's no change + * on the intial topics. (filterRestoredPartitionsWithDiscovered is active) + */ + @Test + public void testSetFilterRestoredParitionsNoChange() throws Exception { + checkFilterRestoredPartitionsWithDisovered( + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + false); + } + + /** + * Tests that removed partitions will be removed from subscribed partitions + * Even if it's still in restored partitions. + * (filterRestoredPartitionsWithDiscovered is active) + */ + @Test + public void testSetFilterRestoredParitionsWithRemovedTopic() throws Exception { + checkFilterRestoredPartitionsWithDisovered( + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + Arrays.asList(new String[]{"kafka_topic_1"}), + Arrays.asList(new String[]{"kafka_topic_1"}), + false); + } + + /** + * Tests that newly added partitions will be added to subscribed partitions. + * (filterRestoredPartitionsWithDiscovered is active) + */ + @Test + public void testSetFilterRestoredParitionsWithAddedTopic() throws Exception { + checkFilterRestoredPartitionsWithDisovered( + Arrays.asList(new String[]{"kafka_topic_1"}), + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + false); + } + + /** + * Tests that subscribed partitions are the same when there's no + * change on the intial topics. + * (filterRestoredPartitionsWithDiscovered is disabled) + */ + @Test + public void testDisableFilterRestoredParitionsNoChange() throws Exception { + checkFilterRestoredPartitionsWithDisovered( + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + true); + } + + /** + * Tests that removed partitions will not be removed from subscribed partitions + * Even if it's still in restored partitions. + * (filterRestoredPartitionsWithDiscovered is disabled) + */ + @Test + public void testDisableFilterRestoredParitionsWithRemovedTopic() throws Exception { + checkFilterRestoredPartitionsWithDisovered( + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + Arrays.asList(new String[]{"kafka_topic_1"}), + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + true); + } + + /** + * Tests that newly added partitions will be added to subscribed partitions. + * (filterRestoredPartitionsWithDiscovered is disabled) + */ + @Test + public void testDisableFilterRestoredParitionsWithAddedTopic() throws Exception { + checkFilterRestoredPartitionsWithDisovered( + Arrays.asList(new String[]{"kafka_topic_1"}), + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}), + true); + } + + private void checkFilterRestoredPartitionsWithDisovered( + List<String> restoredKafkaTopics, + List<String> initKafkaTopics, + List<String> expectedSubscribedPartitions, + Boolean disableFiltering) throws Exception { + final AbstractPartitionDiscoverer discoverer = new TestPartitionDiscoverer( + new KafkaTopicsDescriptor(initKafkaTopics, null), + 0, + 1, + TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(initKafkaTopics), + TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn( + initKafkaTopics.stream() + .map(topic -> new KafkaTopicPartition(topic, 0)) + .collect(Collectors.toList()))); + final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(initKafkaTopics, discoverer); + if (disableFiltering) { + consumer.disableFilterRestoredPartitionsWithDiscovered(); + } + + final TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>(); + + for (int i = 0; i < restoredKafkaTopics.size(); i++) { + listState.add(new Tuple2<>(new KafkaTopicPartition(restoredKafkaTopics.get(i), 0), 12345L)); + } + + setupConsumer(consumer, true, listState, true, 0, 1); + + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = consumer.getSubscribedPartitionsToStartOffsets(); + + assertEquals(new HashSet<>(expectedSubscribedPartitions), + subscribedPartitionsToStartOffsets + .keySet() + .stream() + .map(partition -> partition.getTopic()) + .collect(Collectors.toSet())); + } + @Test @SuppressWarnings("unchecked") public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception { @@ -585,15 +705,17 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { AbstractStreamOperatorTestHarness<String>[] testHarnesses = new AbstractStreamOperatorTestHarness[initialParallelism]; + List<String> testTopics = Collections.singletonList("test-topic"); + for (int i = 0; i < initialParallelism; i++) { TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer( - new KafkaTopicsDescriptor(Collections.singletonList("test-topic"), null), + new KafkaTopicsDescriptor(testTopics, null), i, initialParallelism, - TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList("test-topic")), + TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(testTopics), TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockFetchedPartitionsOnStartup)); - consumers[i] = new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), partitionDiscoverer, false); + consumers[i] = new DummyFlinkKafkaConsumer<>(testTopics, partitionDiscoverer); testHarnesses[i] = createTestHarness(consumers[i], initialParallelism, i); // initializeState() is always called, null signals that we didn't restore @@ -644,13 +766,13 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { mergedState, maxParallelism, initialParallelism, restoredParallelism, i); TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer( - new KafkaTopicsDescriptor(Collections.singletonList("test-topic"), null), + new KafkaTopicsDescriptor(testTopics, null), i, restoredParallelism, - TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList("test-topic")), + TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(testTopics), TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockFetchedPartitionsAfterRestore)); - restoredConsumers[i] = new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), partitionDiscoverer, false); + restoredConsumers[i] = new DummyFlinkKafkaConsumer<>(testTopics, partitionDiscoverer); restoredTestHarnesses[i] = createTestHarness(restoredConsumers[i], restoredParallelism, i); // initializeState() is always called, null signals that we didn't restore @@ -869,6 +991,16 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { } @SuppressWarnings("unchecked") + DummyFlinkKafkaConsumer(List<String> topics, AbstractPartitionDiscoverer abstractPartitionDiscoverer) { + this( + () -> mock(AbstractFetcher.class), + abstractPartitionDiscoverer, + false, + PARTITION_DISCOVERY_DISABLED, + topics); + } + + @SuppressWarnings("unchecked") DummyFlinkKafkaConsumer(SupplierWithException<AbstractFetcher<T, ?>, Exception> abstractFetcherSupplier, AbstractPartitionDiscoverer abstractPartitionDiscoverer, long discoveryIntervalMillis) { this(abstractFetcherSupplier, abstractPartitionDiscoverer, false, discoveryIntervalMillis); } @@ -904,13 +1036,29 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { AbstractPartitionDiscoverer testPartitionDiscoverer, boolean isAutoCommitEnabled, long discoveryIntervalMillis) { + this( + testFetcherSupplier, + testPartitionDiscoverer, + isAutoCommitEnabled, + discoveryIntervalMillis, + Collections.singletonList("dummy-topic") + ); + } + + @SuppressWarnings("unchecked") + DummyFlinkKafkaConsumer( + SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier, + AbstractPartitionDiscoverer testPartitionDiscoverer, + boolean isAutoCommitEnabled, + long discoveryIntervalMillis, + List<String> topics) { super( - Collections.singletonList("dummy-topic"), - null, - (KafkaDeserializationSchema< T >) mock(KafkaDeserializationSchema.class), - discoveryIntervalMillis, - false); + topics, + null, + (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class), + discoveryIntervalMillis, + false); this.testFetcherSupplier = testFetcherSupplier; this.testPartitionDiscoverer = testPartitionDiscoverer; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptorTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptorTest.java new file mode 100644 index 0000000..fa3faec --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Tests for the {@link KafkaTopicsDescriptor}. + */ +@RunWith(Parameterized.class) +public class KafkaTopicsDescriptorTest { + + @Parameterized.Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + { "topic1", null, Arrays.asList("topic1", "topic2", "topic3"), true }, + { "topic1", null, Arrays.asList("topic2", "topic3"), false }, + { "topic1", Pattern.compile("topic[0-9]"), null, true }, + { "topicx", Pattern.compile("topic[0-9]"), null, false } + }); + } + + private String topic; + private Pattern topicPattern; + private List<String> fixedTopics; + boolean expected; + + public KafkaTopicsDescriptorTest(String topic, Pattern topicPattern, List<String> fixedTopics, boolean expected) { + this.topic = topic; + this.topicPattern = topicPattern; + this.fixedTopics = fixedTopics; + this.expected = expected; + } + + @Test + public void testIsMatchingTopic() { + KafkaTopicsDescriptor topicsDescriptor = new KafkaTopicsDescriptor(fixedTopics, topicPattern); + + Assert.assertEquals(expected, topicsDescriptor.isMatchingTopic(topic)); + } +}