This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit d21147e8bc3dab7d42ecd7b1fa48d2736ff7b918
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Mar 25 09:33:33 2022 +0100

    [FLINK-28853][pulsar] Implement pausing Pulsar splits in case of watermark 
drift
---
 .../source/reader/source/PulsarSourceReaderBase.java     |  8 ++++++++
 .../reader/split/PulsarPartitionSplitReaderBase.java     | 16 ++++++++++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
index 0122021..c910e94 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 
+import java.util.Collection;
+
 /**
  * The common pulsar source reader for both ordered & unordered message 
consuming.
  *
@@ -75,6 +77,12 @@ abstract class PulsarSourceReaderBase<OUT>
         return splitState.toPulsarPartitionSplit();
     }
 
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+        splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
+    }
+
     @Override
     public void close() throws Exception {
         // Close the all the consumers first.
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index 9537969..c1459a6 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -50,6 +50,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -179,6 +180,21 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
         LOG.info("Register split {} consumer for current reader.", 
registeredSplit);
     }
 
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<PulsarPartitionSplit> splitsToPause,
+            Collection<PulsarPartitionSplit> splitsToResume) {
+        if (splitsToPause.size() > 1 || splitsToResume.size() > 1) {
+            throw new IllegalStateException("This pulsar split reader only 
support one split.");
+        }
+
+        if (!splitsToPause.isEmpty()) {
+            pulsarConsumer.pause();
+        } else if (!splitsToResume.isEmpty()) {
+            pulsarConsumer.resume();
+        }
+    }
+
     @Override
     public void wakeUp() {
         // Nothing to do on this method.

Reply via email to