This is an automated email from the ASF dual-hosted git repository.

masc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 00c9c8c7 [FLINK-35228][Connectors/Kafka] Fix: DynamicKafkaSource does 
not read re-added topic for the same cluster (#97)
00c9c8c7 is described below

commit 00c9c8c74121136a0c1710ac77f307dc53adae99
Author: Ignas Daukšas <ign...@gmail.com>
AuthorDate: Tue Apr 30 02:13:37 2024 +0300

    [FLINK-35228][Connectors/Kafka] Fix: DynamicKafkaSource does not read 
re-added topic for the same cluster (#97)
---
 .../enumerator/DynamicKafkaSourceEnumerator.java   |  48 ++++---
 .../dynamic/source/DynamicKafkaSourceITTest.java   | 144 ++++++++++++++++++++-
 .../DynamicKafkaSourceEnumeratorTest.java          |  99 ++++++++++++++
 .../kafka/DynamicKafkaSourceTestHelper.java        |  27 ++--
 4 files changed, 284 insertions(+), 34 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
index e14a36d9..20e8b923 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
@@ -288,23 +288,34 @@ public class DynamicKafkaSourceEnumerator
 
         // create enumerators
         for (Entry<String, Set<String>> activeClusterTopics : 
latestClusterTopicsMap.entrySet()) {
-            final Set<TopicPartition> activeTopicPartitions = new HashSet<>();
-
-            if (dynamicKafkaSourceEnumState
+            KafkaSourceEnumState kafkaSourceEnumState =
+                    dynamicKafkaSourceEnumState
                             .getClusterEnumeratorStates()
-                            .get(activeClusterTopics.getKey())
-                    != null) {
-                Set<TopicPartition> oldTopicPartitions =
-                        dynamicKafkaSourceEnumState
-                                .getClusterEnumeratorStates()
-                                .get(activeClusterTopics.getKey())
-                                .assignedPartitions();
+                            .get(activeClusterTopics.getKey());
+
+            final KafkaSourceEnumState newKafkaSourceEnumState;
+            if (kafkaSourceEnumState != null) {
+                final Set<String> activeTopics = 
activeClusterTopics.getValue();
+
                 // filter out removed topics
-                for (TopicPartition oldTopicPartition : oldTopicPartitions) {
-                    if 
(activeClusterTopics.getValue().contains(oldTopicPartition.topic())) {
-                        activeTopicPartitions.add(oldTopicPartition);
-                    }
-                }
+                Set<TopicPartition> activeAssignedPartitions =
+                        kafkaSourceEnumState.assignedPartitions().stream()
+                                .filter(tp -> 
activeTopics.contains(tp.topic()))
+                                .collect(Collectors.toSet());
+                Set<TopicPartition> activeUnassignedInitialPartitions =
+                        
kafkaSourceEnumState.unassignedInitialPartitions().stream()
+                                .filter(tp -> 
activeTopics.contains(tp.topic()))
+                                .collect(Collectors.toSet());
+
+                newKafkaSourceEnumState =
+                        new KafkaSourceEnumState(
+                                activeAssignedPartitions,
+                                activeUnassignedInitialPartitions,
+                                
kafkaSourceEnumState.initialDiscoveryFinished());
+            } else {
+                newKafkaSourceEnumState =
+                        new KafkaSourceEnumState(
+                                Collections.emptySet(), 
Collections.emptySet(), false);
             }
 
             // restarts enumerator from state using only the active topic 
partitions, to avoid
@@ -312,12 +323,7 @@ public class DynamicKafkaSourceEnumerator
             createEnumeratorWithAssignedTopicPartitions(
                     activeClusterTopics.getKey(),
                     activeClusterTopics.getValue(),
-                    dynamicKafkaSourceEnumState
-                            .getClusterEnumeratorStates()
-                            .getOrDefault(
-                                    activeClusterTopics.getKey(),
-                                    new KafkaSourceEnumState(
-                                            Collections.emptySet(), 
Collections.emptySet(), false)),
+                    newKafkaSourceEnumState,
                     clusterProperties.get(activeClusterTopics.getKey()));
         }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
index 4ea1bd7d..edd86756 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
@@ -82,6 +82,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroup.DYNAMIC_KAFKA_SOURCE_METRIC_GROUP;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -372,6 +373,132 @@ public class DynamicKafkaSourceITTest extends TestLogger {
                                     .collect(Collectors.toList()));
         }
 
+        @Test
+        void testTopicReAddMigrationUsingFileMetadataService() throws 
Throwable {
+            // setup topics
+            int kafkaClusterIdx = 0;
+            String topic1 = "test-topic-re-add-1";
+            String topic2 = "test-topic-re-add-2";
+            DynamicKafkaSourceTestHelper.createTopic(kafkaClusterIdx, topic1, 
NUM_PARTITIONS);
+            DynamicKafkaSourceTestHelper.createTopic(kafkaClusterIdx, topic2, 
NUM_PARTITIONS);
+
+            // Flink job config and env
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setRestartStrategy(RestartStrategies.noRestart());
+            env.setParallelism(2);
+            Properties properties = new Properties();
+            properties.setProperty(
+                    KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), 
"1000");
+            properties.setProperty(
+                    
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "5000");
+            properties.setProperty(
+                    
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD.key(),
+                    "2");
+            properties.setProperty(CommonClientConfigs.GROUP_ID_CONFIG, 
"dynamic-kafka-src");
+
+            // create new metadata file to consume from 1 cluster
+            String testStreamId = "test-topic-re-add-stream";
+            File metadataFile = File.createTempFile(testDir.getPath() + 
"/metadata", ".yaml");
+            YamlFileMetadataService yamlFileMetadataService =
+                    new YamlFileMetadataService(metadataFile.getPath(), 
Duration.ofMillis(100));
+            writeClusterMetadataToFile(
+                    metadataFile,
+                    testStreamId,
+                    ImmutableList.of(topic1),
+                    ImmutableList.of(
+                            
DynamicKafkaSourceTestHelper.getKafkaClusterTestEnvMetadata(
+                                    kafkaClusterIdx)));
+
+            DynamicKafkaSource<Integer> dynamicKafkaSource =
+                    DynamicKafkaSource.<Integer>builder()
+                            .setStreamIds(Collections.singleton(testStreamId))
+                            .setKafkaMetadataService(yamlFileMetadataService)
+                            .setDeserializer(
+                                    KafkaRecordDeserializationSchema.valueOnly(
+                                            IntegerDeserializer.class))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setProperties(properties)
+                            .build();
+
+            DataStreamSource<Integer> stream =
+                    env.fromSource(
+                            dynamicKafkaSource,
+                            WatermarkStrategy.noWatermarks(),
+                            "dynamic-kafka-src");
+            List<Integer> results = new ArrayList<>();
+
+            int stage1Records =
+                    DynamicKafkaSourceTestHelper.produceToKafka(
+                            kafkaClusterIdx, topic1, NUM_PARTITIONS, 
NUM_RECORDS_PER_SPLIT, 0);
+            int stage2Records =
+                    DynamicKafkaSourceTestHelper.produceToKafka(
+                            kafkaClusterIdx,
+                            topic2,
+                            NUM_PARTITIONS,
+                            NUM_RECORDS_PER_SPLIT,
+                            stage1Records);
+
+            try (CloseableIterator<Integer> iterator = 
stream.executeAndCollect()) {
+                CommonTestUtils.waitUtil(
+                        () -> {
+                            try {
+                                results.add(iterator.next());
+
+                                // switch to second topic after first is read
+                                if (results.size() == stage1Records) {
+                                    writeClusterMetadataToFile(
+                                            metadataFile,
+                                            testStreamId,
+                                            ImmutableList.of(topic2),
+                                            ImmutableList.of(
+                                                    
DynamicKafkaSourceTestHelper
+                                                            
.getKafkaClusterTestEnvMetadata(
+                                                                    
kafkaClusterIdx)));
+                                }
+
+                                // re-add first topic again after second is 
read
+                                // produce another batch to first topic
+                                if (results.size() == stage2Records) {
+                                    
DynamicKafkaSourceTestHelper.produceToKafka(
+                                            kafkaClusterIdx,
+                                            topic1,
+                                            NUM_PARTITIONS,
+                                            NUM_RECORDS_PER_SPLIT,
+                                            stage2Records);
+                                    writeClusterMetadataToFile(
+                                            metadataFile,
+                                            testStreamId,
+                                            ImmutableList.of(topic1, topic2),
+                                            ImmutableList.of(
+                                                    
DynamicKafkaSourceTestHelper
+                                                            
.getKafkaClusterTestEnvMetadata(
+                                                                    
kafkaClusterIdx)));
+                                }
+                            } catch (NoSuchElementException e) {
+                                // swallow and wait
+                            } catch (IOException e) {
+                                throw new UncheckedIOException(e);
+                            } catch (Throwable e) {
+                                throw new RuntimeException(e);
+                            }
+
+                            // first batch of topic 1 * 2 + topic 2 + second 
batch of topic 1
+                            return results.size() == NUM_PARTITIONS * 
NUM_RECORDS_PER_SPLIT * 4;
+                        },
+                        Duration.ofSeconds(15),
+                        "Could not schedule callable within timeout");
+            }
+
+            // verify data
+            Stream<Integer> expectedFullRead =
+                    IntStream.range(0, NUM_PARTITIONS * NUM_RECORDS_PER_SPLIT 
* 3).boxed();
+            Stream<Integer> expectedReRead =
+                    IntStream.range(0, NUM_PARTITIONS * 
NUM_RECORDS_PER_SPLIT).boxed();
+            List<Integer> expectedResults =
+                    Stream.concat(expectedFullRead, 
expectedReRead).collect(Collectors.toList());
+            
assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+        }
+
         @Test
         void testStreamPatternSubscriber() throws Throwable {
             DynamicKafkaSourceTestHelper.createTopic(0, 
"stream-pattern-test-1", NUM_PARTITIONS);
@@ -621,7 +748,7 @@ public class DynamicKafkaSourceITTest extends TestLogger {
         private void writeClusterMetadataToFile(
                 File metadataFile,
                 String streamId,
-                String topic,
+                List<String> topics,
                 List<KafkaTestBase.KafkaClusterTestEnvMetadata> 
kafkaClusterTestEnvMetadataList)
                 throws IOException {
             List<YamlFileMetadataService.StreamMetadata.ClusterMetadata> 
clusterMetadata =
@@ -633,7 +760,7 @@ public class DynamicKafkaSourceITTest extends TestLogger {
                                                     
KafkaClusterTestEnvMetadata.getKafkaClusterId(),
                                                     KafkaClusterTestEnvMetadata
                                                             
.getBrokerConnectionStrings(),
-                                                    ImmutableList.of(topic)))
+                                                    topics))
                             .collect(Collectors.toList());
             YamlFileMetadataService.StreamMetadata streamMetadata =
                     new YamlFileMetadataService.StreamMetadata(streamId, 
clusterMetadata);
@@ -641,6 +768,19 @@ public class DynamicKafkaSourceITTest extends TestLogger {
                     Collections.singletonList(streamMetadata), metadataFile);
         }
 
+        private void writeClusterMetadataToFile(
+                File metadataFile,
+                String streamId,
+                String topic,
+                List<KafkaTestBase.KafkaClusterTestEnvMetadata> 
kafkaClusterTestEnvMetadataList)
+                throws IOException {
+            writeClusterMetadataToFile(
+                    metadataFile,
+                    streamId,
+                    ImmutableList.of(topic),
+                    kafkaClusterTestEnvMetadataList);
+        }
+
         private Set<String> findMetrics(InMemoryReporter inMemoryReporter, 
String groupPattern) {
             Optional<MetricGroup> groups = 
inMemoryReporter.findGroup(groupPattern);
             assertThat(groups).isPresent();
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
index 3c3a76e8..86133345 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.connector.kafka.dynamic.source.GetMetadataUpdateEvent;
 import 
org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
 import 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.connector.kafka.source.enumerator.AssignmentStatus;
+import 
org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.testutils.MockKafkaMetadataService;
@@ -464,6 +466,91 @@ public class DynamicKafkaSourceEnumeratorTest {
         }
     }
 
+    @Test
+    public void testEnumeratorStateDoesNotContainStaleTopicPartitions() throws 
Throwable {
+        final String topic2 = TOPIC + "_2";
+
+        DynamicKafkaSourceTestHelper.createTopic(topic2, 
NUM_SPLITS_PER_CLUSTER, 1);
+        DynamicKafkaSourceTestHelper.produceToKafka(
+                topic2, NUM_SPLITS_PER_CLUSTER, NUM_RECORDS_PER_SPLIT);
+
+        final Set<KafkaStream> initialStreams =
+                Collections.singleton(
+                        new KafkaStream(
+                                TOPIC,
+                                
DynamicKafkaSourceTestHelper.getClusterMetadataMap(
+                                        0, TOPIC, topic2)));
+
+        final Set<KafkaStream> updatedStreams =
+                Collections.singleton(
+                        new KafkaStream(
+                                TOPIC,
+                                
DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC)));
+
+        try (MockKafkaMetadataService metadataService =
+                        new MockKafkaMetadataService(initialStreams);
+                MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+                DynamicKafkaSourceEnumerator enumerator =
+                        createEnumerator(
+                                context,
+                                metadataService,
+                                (properties) ->
+                                        properties.setProperty(
+                                                DynamicKafkaSourceOptions
+                                                        
.STREAM_METADATA_DISCOVERY_INTERVAL_MS
+                                                        .key(),
+                                                "1"))) {
+            enumerator.start();
+
+            context.runPeriodicCallable(0);
+
+            runAllOneTimeCallables(context);
+
+            mockRegisterReaderAndSendReaderStartupEvent(context, enumerator, 
0);
+            mockRegisterReaderAndSendReaderStartupEvent(context, enumerator, 
1);
+
+            DynamicKafkaSourceEnumState initialState = 
enumerator.snapshotState(-1);
+
+            assertThat(getFilteredTopicPartitions(initialState, TOPIC, 
AssignmentStatus.ASSIGNED))
+                    .hasSize(2);
+            assertThat(
+                            getFilteredTopicPartitions(
+                                    initialState, TOPIC, 
AssignmentStatus.UNASSIGNED_INITIAL))
+                    .hasSize(1);
+            assertThat(getFilteredTopicPartitions(initialState, topic2, 
AssignmentStatus.ASSIGNED))
+                    .hasSize(2);
+            assertThat(
+                            getFilteredTopicPartitions(
+                                    initialState, topic2, 
AssignmentStatus.UNASSIGNED_INITIAL))
+                    .hasSize(1);
+
+            // mock metadata change
+            metadataService.setKafkaStreams(updatedStreams);
+
+            // changes should have occurred here
+            context.runPeriodicCallable(0);
+            runAllOneTimeCallables(context);
+
+            mockRegisterReaderAndSendReaderStartupEvent(context, enumerator, 
2);
+
+            DynamicKafkaSourceEnumState migratedState = 
enumerator.snapshotState(-1);
+
+            assertThat(getFilteredTopicPartitions(migratedState, TOPIC, 
AssignmentStatus.ASSIGNED))
+                    .hasSize(3);
+            assertThat(
+                            getFilteredTopicPartitions(
+                                    migratedState, TOPIC, 
AssignmentStatus.UNASSIGNED_INITIAL))
+                    .isEmpty();
+            assertThat(getFilteredTopicPartitions(migratedState, topic2, 
AssignmentStatus.ASSIGNED))
+                    .isEmpty();
+            assertThat(
+                            getFilteredTopicPartitions(
+                                    migratedState, topic2, 
AssignmentStatus.UNASSIGNED_INITIAL))
+                    .isEmpty();
+        }
+    }
+
     @Test
     public void testStartupWithCheckpointState() throws Throwable {
         // init enumerator with checkpoint state
@@ -865,6 +952,18 @@ public class DynamicKafkaSourceEnumeratorTest {
         return readerToSplits;
     }
 
+    private List<TopicPartition> getFilteredTopicPartitions(
+            DynamicKafkaSourceEnumState state, String topic, AssignmentStatus 
assignmentStatus) {
+        return state.getClusterEnumeratorStates().values().stream()
+                .flatMap(s -> s.partitions().stream())
+                .filter(
+                        partition ->
+                                
partition.topicPartition().topic().equals(topic)
+                                        && partition.assignmentStatus() == 
assignmentStatus)
+                .map(TopicPartitionAndAssignmentStatus::topicPartition)
+                .collect(Collectors.toList());
+    }
+
     private static void runAllOneTimeCallables(MockSplitEnumeratorContext 
context)
             throws Throwable {
         while (!context.getOneTimeCallables().isEmpty()) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
index 0ec02cc0..8eb0d28c 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -67,21 +68,25 @@ public class DynamicKafkaSourceTestHelper extends 
KafkaTestBase {
         return kafkaClusters.get(kafkaClusterIdx).getKafkaClusterId();
     }
 
+    public static Map<String, ClusterMetadata> getClusterMetadataMap(
+            int kafkaClusterIdx, String... topics) {
+        KafkaClusterTestEnvMetadata kafkaClusterTestEnvMetadata =
+                getKafkaClusterTestEnvMetadata(kafkaClusterIdx);
+
+        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics));
+
+        ClusterMetadata clusterMetadata =
+                new ClusterMetadata(topicsSet, 
kafkaClusterTestEnvMetadata.getStandardProperties());
+
+        return Collections.singletonMap(
+                kafkaClusterTestEnvMetadata.getKafkaClusterId(), 
clusterMetadata);
+    }
+
     /** Stream is a topic across multiple clusters. */
     public static KafkaStream getKafkaStream(String topic) {
         Map<String, ClusterMetadata> clusterMetadataMap = new HashMap<>();
         for (int i = 0; i < NUM_KAFKA_CLUSTERS; i++) {
-            KafkaClusterTestEnvMetadata kafkaClusterTestEnvMetadata =
-                    getKafkaClusterTestEnvMetadata(i);
-
-            Set<String> topics = new HashSet<>();
-            topics.add(topic);
-
-            ClusterMetadata clusterMetadata =
-                    new ClusterMetadata(
-                            topics, 
kafkaClusterTestEnvMetadata.getStandardProperties());
-            clusterMetadataMap.put(
-                    kafkaClusterTestEnvMetadata.getKafkaClusterId(), 
clusterMetadata);
+            clusterMetadataMap.putAll(getClusterMetadataMap(i, topic));
         }
 
         return new KafkaStream(topic, clusterMetadataMap);

Reply via email to