This is an automated email from the ASF dual-hosted git repository. masc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push: new 00c9c8c7 [FLINK-35228][Connectors/Kafka] Fix: DynamicKafkaSource does not read re-added topic for the same cluster (#97) 00c9c8c7 is described below commit 00c9c8c74121136a0c1710ac77f307dc53adae99 Author: Ignas Daukšas <ign...@gmail.com> AuthorDate: Tue Apr 30 02:13:37 2024 +0300 [FLINK-35228][Connectors/Kafka] Fix: DynamicKafkaSource does not read re-added topic for the same cluster (#97) --- .../enumerator/DynamicKafkaSourceEnumerator.java | 48 ++++--- .../dynamic/source/DynamicKafkaSourceITTest.java | 144 ++++++++++++++++++++- .../DynamicKafkaSourceEnumeratorTest.java | 99 ++++++++++++++ .../kafka/DynamicKafkaSourceTestHelper.java | 27 ++-- 4 files changed, 284 insertions(+), 34 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java index e14a36d9..20e8b923 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java @@ -288,23 +288,34 @@ public class DynamicKafkaSourceEnumerator // create enumerators for (Entry<String, Set<String>> activeClusterTopics : latestClusterTopicsMap.entrySet()) { - final Set<TopicPartition> activeTopicPartitions = new HashSet<>(); - - if (dynamicKafkaSourceEnumState + KafkaSourceEnumState kafkaSourceEnumState = + dynamicKafkaSourceEnumState .getClusterEnumeratorStates() - .get(activeClusterTopics.getKey()) - != null) { - Set<TopicPartition> oldTopicPartitions = - dynamicKafkaSourceEnumState - .getClusterEnumeratorStates() - .get(activeClusterTopics.getKey()) - .assignedPartitions(); + .get(activeClusterTopics.getKey()); + + final KafkaSourceEnumState newKafkaSourceEnumState; + if (kafkaSourceEnumState != null) { + final Set<String> activeTopics = activeClusterTopics.getValue(); + // filter out removed topics - for (TopicPartition oldTopicPartition : oldTopicPartitions) { - if (activeClusterTopics.getValue().contains(oldTopicPartition.topic())) { - activeTopicPartitions.add(oldTopicPartition); - } - } + Set<TopicPartition> activeAssignedPartitions = + kafkaSourceEnumState.assignedPartitions().stream() + .filter(tp -> activeTopics.contains(tp.topic())) + .collect(Collectors.toSet()); + Set<TopicPartition> activeUnassignedInitialPartitions = + kafkaSourceEnumState.unassignedInitialPartitions().stream() + .filter(tp -> activeTopics.contains(tp.topic())) + .collect(Collectors.toSet()); + + newKafkaSourceEnumState = + new KafkaSourceEnumState( + activeAssignedPartitions, + activeUnassignedInitialPartitions, + kafkaSourceEnumState.initialDiscoveryFinished()); + } else { + newKafkaSourceEnumState = + new KafkaSourceEnumState( + Collections.emptySet(), Collections.emptySet(), false); } // restarts enumerator from state using only the active topic partitions, to avoid @@ -312,12 +323,7 @@ public class DynamicKafkaSourceEnumerator createEnumeratorWithAssignedTopicPartitions( activeClusterTopics.getKey(), activeClusterTopics.getValue(), - dynamicKafkaSourceEnumState - .getClusterEnumeratorStates() - .getOrDefault( - activeClusterTopics.getKey(), - new KafkaSourceEnumState( - Collections.emptySet(), Collections.emptySet(), false)), + newKafkaSourceEnumState, clusterProperties.get(activeClusterTopics.getKey())); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java index 4ea1bd7d..edd86756 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java @@ -82,6 +82,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroup.DYNAMIC_KAFKA_SOURCE_METRIC_GROUP; import static org.assertj.core.api.Assertions.assertThat; @@ -372,6 +373,132 @@ public class DynamicKafkaSourceITTest extends TestLogger { .collect(Collectors.toList())); } + @Test + void testTopicReAddMigrationUsingFileMetadataService() throws Throwable { + // setup topics + int kafkaClusterIdx = 0; + String topic1 = "test-topic-re-add-1"; + String topic2 = "test-topic-re-add-2"; + DynamicKafkaSourceTestHelper.createTopic(kafkaClusterIdx, topic1, NUM_PARTITIONS); + DynamicKafkaSourceTestHelper.createTopic(kafkaClusterIdx, topic2, NUM_PARTITIONS); + + // Flink job config and env + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(2); + Properties properties = new Properties(); + properties.setProperty( + KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "1000"); + properties.setProperty( + DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "5000"); + properties.setProperty( + DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD.key(), + "2"); + properties.setProperty(CommonClientConfigs.GROUP_ID_CONFIG, "dynamic-kafka-src"); + + // create new metadata file to consume from 1 cluster + String testStreamId = "test-topic-re-add-stream"; + File metadataFile = File.createTempFile(testDir.getPath() + "/metadata", ".yaml"); + YamlFileMetadataService yamlFileMetadataService = + new YamlFileMetadataService(metadataFile.getPath(), Duration.ofMillis(100)); + writeClusterMetadataToFile( + metadataFile, + testStreamId, + ImmutableList.of(topic1), + ImmutableList.of( + DynamicKafkaSourceTestHelper.getKafkaClusterTestEnvMetadata( + kafkaClusterIdx))); + + DynamicKafkaSource<Integer> dynamicKafkaSource = + DynamicKafkaSource.<Integer>builder() + .setStreamIds(Collections.singleton(testStreamId)) + .setKafkaMetadataService(yamlFileMetadataService) + .setDeserializer( + KafkaRecordDeserializationSchema.valueOnly( + IntegerDeserializer.class)) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setProperties(properties) + .build(); + + DataStreamSource<Integer> stream = + env.fromSource( + dynamicKafkaSource, + WatermarkStrategy.noWatermarks(), + "dynamic-kafka-src"); + List<Integer> results = new ArrayList<>(); + + int stage1Records = + DynamicKafkaSourceTestHelper.produceToKafka( + kafkaClusterIdx, topic1, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT, 0); + int stage2Records = + DynamicKafkaSourceTestHelper.produceToKafka( + kafkaClusterIdx, + topic2, + NUM_PARTITIONS, + NUM_RECORDS_PER_SPLIT, + stage1Records); + + try (CloseableIterator<Integer> iterator = stream.executeAndCollect()) { + CommonTestUtils.waitUtil( + () -> { + try { + results.add(iterator.next()); + + // switch to second topic after first is read + if (results.size() == stage1Records) { + writeClusterMetadataToFile( + metadataFile, + testStreamId, + ImmutableList.of(topic2), + ImmutableList.of( + DynamicKafkaSourceTestHelper + .getKafkaClusterTestEnvMetadata( + kafkaClusterIdx))); + } + + // re-add first topic again after second is read + // produce another batch to first topic + if (results.size() == stage2Records) { + DynamicKafkaSourceTestHelper.produceToKafka( + kafkaClusterIdx, + topic1, + NUM_PARTITIONS, + NUM_RECORDS_PER_SPLIT, + stage2Records); + writeClusterMetadataToFile( + metadataFile, + testStreamId, + ImmutableList.of(topic1, topic2), + ImmutableList.of( + DynamicKafkaSourceTestHelper + .getKafkaClusterTestEnvMetadata( + kafkaClusterIdx))); + } + } catch (NoSuchElementException e) { + // swallow and wait + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + // first batch of topic 1 * 2 + topic 2 + second batch of topic 1 + return results.size() == NUM_PARTITIONS * NUM_RECORDS_PER_SPLIT * 4; + }, + Duration.ofSeconds(15), + "Could not schedule callable within timeout"); + } + + // verify data + Stream<Integer> expectedFullRead = + IntStream.range(0, NUM_PARTITIONS * NUM_RECORDS_PER_SPLIT * 3).boxed(); + Stream<Integer> expectedReRead = + IntStream.range(0, NUM_PARTITIONS * NUM_RECORDS_PER_SPLIT).boxed(); + List<Integer> expectedResults = + Stream.concat(expectedFullRead, expectedReRead).collect(Collectors.toList()); + assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults); + } + @Test void testStreamPatternSubscriber() throws Throwable { DynamicKafkaSourceTestHelper.createTopic(0, "stream-pattern-test-1", NUM_PARTITIONS); @@ -621,7 +748,7 @@ public class DynamicKafkaSourceITTest extends TestLogger { private void writeClusterMetadataToFile( File metadataFile, String streamId, - String topic, + List<String> topics, List<KafkaTestBase.KafkaClusterTestEnvMetadata> kafkaClusterTestEnvMetadataList) throws IOException { List<YamlFileMetadataService.StreamMetadata.ClusterMetadata> clusterMetadata = @@ -633,7 +760,7 @@ public class DynamicKafkaSourceITTest extends TestLogger { KafkaClusterTestEnvMetadata.getKafkaClusterId(), KafkaClusterTestEnvMetadata .getBrokerConnectionStrings(), - ImmutableList.of(topic))) + topics)) .collect(Collectors.toList()); YamlFileMetadataService.StreamMetadata streamMetadata = new YamlFileMetadataService.StreamMetadata(streamId, clusterMetadata); @@ -641,6 +768,19 @@ public class DynamicKafkaSourceITTest extends TestLogger { Collections.singletonList(streamMetadata), metadataFile); } + private void writeClusterMetadataToFile( + File metadataFile, + String streamId, + String topic, + List<KafkaTestBase.KafkaClusterTestEnvMetadata> kafkaClusterTestEnvMetadataList) + throws IOException { + writeClusterMetadataToFile( + metadataFile, + streamId, + ImmutableList.of(topic), + kafkaClusterTestEnvMetadataList); + } + private Set<String> findMetrics(InMemoryReporter inMemoryReporter, String groupPattern) { Optional<MetricGroup> groups = inMemoryReporter.findGroup(groupPattern); assertThat(groups).isPresent(); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java index 3c3a76e8..86133345 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java @@ -32,6 +32,8 @@ import org.apache.flink.connector.kafka.dynamic.source.GetMetadataUpdateEvent; import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber; import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; +import org.apache.flink.connector.kafka.source.enumerator.AssignmentStatus; +import org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus; import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.testutils.MockKafkaMetadataService; @@ -464,6 +466,91 @@ public class DynamicKafkaSourceEnumeratorTest { } } + @Test + public void testEnumeratorStateDoesNotContainStaleTopicPartitions() throws Throwable { + final String topic2 = TOPIC + "_2"; + + DynamicKafkaSourceTestHelper.createTopic(topic2, NUM_SPLITS_PER_CLUSTER, 1); + DynamicKafkaSourceTestHelper.produceToKafka( + topic2, NUM_SPLITS_PER_CLUSTER, NUM_RECORDS_PER_SPLIT); + + final Set<KafkaStream> initialStreams = + Collections.singleton( + new KafkaStream( + TOPIC, + DynamicKafkaSourceTestHelper.getClusterMetadataMap( + 0, TOPIC, topic2))); + + final Set<KafkaStream> updatedStreams = + Collections.singleton( + new KafkaStream( + TOPIC, + DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC))); + + try (MockKafkaMetadataService metadataService = + new MockKafkaMetadataService(initialStreams); + MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + DynamicKafkaSourceEnumerator enumerator = + createEnumerator( + context, + metadataService, + (properties) -> + properties.setProperty( + DynamicKafkaSourceOptions + .STREAM_METADATA_DISCOVERY_INTERVAL_MS + .key(), + "1"))) { + enumerator.start(); + + context.runPeriodicCallable(0); + + runAllOneTimeCallables(context); + + mockRegisterReaderAndSendReaderStartupEvent(context, enumerator, 0); + mockRegisterReaderAndSendReaderStartupEvent(context, enumerator, 1); + + DynamicKafkaSourceEnumState initialState = enumerator.snapshotState(-1); + + assertThat(getFilteredTopicPartitions(initialState, TOPIC, AssignmentStatus.ASSIGNED)) + .hasSize(2); + assertThat( + getFilteredTopicPartitions( + initialState, TOPIC, AssignmentStatus.UNASSIGNED_INITIAL)) + .hasSize(1); + assertThat(getFilteredTopicPartitions(initialState, topic2, AssignmentStatus.ASSIGNED)) + .hasSize(2); + assertThat( + getFilteredTopicPartitions( + initialState, topic2, AssignmentStatus.UNASSIGNED_INITIAL)) + .hasSize(1); + + // mock metadata change + metadataService.setKafkaStreams(updatedStreams); + + // changes should have occurred here + context.runPeriodicCallable(0); + runAllOneTimeCallables(context); + + mockRegisterReaderAndSendReaderStartupEvent(context, enumerator, 2); + + DynamicKafkaSourceEnumState migratedState = enumerator.snapshotState(-1); + + assertThat(getFilteredTopicPartitions(migratedState, TOPIC, AssignmentStatus.ASSIGNED)) + .hasSize(3); + assertThat( + getFilteredTopicPartitions( + migratedState, TOPIC, AssignmentStatus.UNASSIGNED_INITIAL)) + .isEmpty(); + assertThat(getFilteredTopicPartitions(migratedState, topic2, AssignmentStatus.ASSIGNED)) + .isEmpty(); + assertThat( + getFilteredTopicPartitions( + migratedState, topic2, AssignmentStatus.UNASSIGNED_INITIAL)) + .isEmpty(); + } + } + @Test public void testStartupWithCheckpointState() throws Throwable { // init enumerator with checkpoint state @@ -865,6 +952,18 @@ public class DynamicKafkaSourceEnumeratorTest { return readerToSplits; } + private List<TopicPartition> getFilteredTopicPartitions( + DynamicKafkaSourceEnumState state, String topic, AssignmentStatus assignmentStatus) { + return state.getClusterEnumeratorStates().values().stream() + .flatMap(s -> s.partitions().stream()) + .filter( + partition -> + partition.topicPartition().topic().equals(topic) + && partition.assignmentStatus() == assignmentStatus) + .map(TopicPartitionAndAssignmentStatus::topicPartition) + .collect(Collectors.toList()); + } + private static void runAllOneTimeCallables(MockSplitEnumeratorContext context) throws Throwable { while (!context.getOneTimeCallables().isEmpty()) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java index 0ec02cc0..8eb0d28c 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -67,21 +68,25 @@ public class DynamicKafkaSourceTestHelper extends KafkaTestBase { return kafkaClusters.get(kafkaClusterIdx).getKafkaClusterId(); } + public static Map<String, ClusterMetadata> getClusterMetadataMap( + int kafkaClusterIdx, String... topics) { + KafkaClusterTestEnvMetadata kafkaClusterTestEnvMetadata = + getKafkaClusterTestEnvMetadata(kafkaClusterIdx); + + Set<String> topicsSet = new HashSet<>(Arrays.asList(topics)); + + ClusterMetadata clusterMetadata = + new ClusterMetadata(topicsSet, kafkaClusterTestEnvMetadata.getStandardProperties()); + + return Collections.singletonMap( + kafkaClusterTestEnvMetadata.getKafkaClusterId(), clusterMetadata); + } + /** Stream is a topic across multiple clusters. */ public static KafkaStream getKafkaStream(String topic) { Map<String, ClusterMetadata> clusterMetadataMap = new HashMap<>(); for (int i = 0; i < NUM_KAFKA_CLUSTERS; i++) { - KafkaClusterTestEnvMetadata kafkaClusterTestEnvMetadata = - getKafkaClusterTestEnvMetadata(i); - - Set<String> topics = new HashSet<>(); - topics.add(topic); - - ClusterMetadata clusterMetadata = - new ClusterMetadata( - topics, kafkaClusterTestEnvMetadata.getStandardProperties()); - clusterMetadataMap.put( - kafkaClusterTestEnvMetadata.getKafkaClusterId(), clusterMetadata); + clusterMetadataMap.putAll(getClusterMetadataMap(i, topic)); } return new KafkaStream(topic, clusterMetadataMap);