This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit bc744f2ab88872c1cf0b56e1b2408f5cde8e4950
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Mar 25 09:25:57 2022 +0100

    [FLINK-28853][kafka] Implement pausing Kafka partitions in case of 
watermark drift
---
 .../kafka/source/reader/KafkaPartitionSplitReader.java     | 14 ++++++++++++++
 .../connector/kafka/source/reader/KafkaSourceReader.java   |  7 +++++++
 2 files changed, 21 insertions(+)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index 0078d78..c440fc2 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -227,6 +227,20 @@ public class KafkaPartitionSplitReader
         consumer.close();
     }
 
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<KafkaPartitionSplit> splitsToPause,
+            Collection<KafkaPartitionSplit> splitsToResume) {
+        consumer.resume(
+                splitsToResume.stream()
+                        .map(KafkaPartitionSplit::getTopicPartition)
+                        .collect(Collectors.toList()));
+        consumer.pause(
+                splitsToPause.stream()
+                        .map(KafkaPartitionSplit::getTopicPartition)
+                        .collect(Collectors.toList()));
+    }
+
     // ---------------
 
     public void notifyCheckpointComplete(
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
index a3aa555..e8fe3d7 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -185,6 +186,12 @@ public class KafkaSourceReader<T>
         return splitState.toKafkaPartitionSplit();
     }
 
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+        splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
+    }
+
     // ------------------------
 
     @VisibleForTesting

Reply via email to