This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit bc744f2ab88872c1cf0b56e1b2408f5cde8e4950 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri Mar 25 09:25:57 2022 +0100 [FLINK-28853][kafka] Implement pausing Kafka partitions in case of watermark drift --- .../kafka/source/reader/KafkaPartitionSplitReader.java | 14 ++++++++++++++ .../connector/kafka/source/reader/KafkaSourceReader.java | 7 +++++++ 2 files changed, 21 insertions(+) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java index 0078d78..c440fc2 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java @@ -227,6 +227,20 @@ public class KafkaPartitionSplitReader consumer.close(); } + @Override + public void pauseOrResumeSplits( + Collection<KafkaPartitionSplit> splitsToPause, + Collection<KafkaPartitionSplit> splitsToResume) { + consumer.resume( + splitsToResume.stream() + .map(KafkaPartitionSplit::getTopicPartition) + .collect(Collectors.toList())); + consumer.pause( + splitsToPause.stream() + .map(KafkaPartitionSplit::getTopicPartition) + .collect(Collectors.toList())); + } + // --------------- public void notifyCheckpointComplete( diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java index a3aa555..e8fe3d7 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -185,6 +186,12 @@ public class KafkaSourceReader<T> return splitState.toKafkaPartitionSplit(); } + @Override + public void pauseOrResumeSplits( + Collection<String> splitsToPause, Collection<String> splitsToResume) { + splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume); + } + // ------------------------ @VisibleForTesting