This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit b0a1678f91dae9b7bee35f77c79b441f365499be Author: Maximilian Michels <m...@apache.org> AuthorDate: Thu Sep 1 16:15:33 2022 +0200 [FLINK-28853] Address PR comments / Add Kafka and Pulsar split pausing tests --- .../split/PulsarPartitionSplitReaderBase.java | 9 +++--- .../source/PulsarOrderedSourceReaderTest.java | 32 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java index c1459a6..b884c57 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java @@ -162,7 +162,7 @@ abstract class PulsarPartitionSplitReaderBase<OUT> List<PulsarPartitionSplit> newSplits = splitsChanges.splits(); Preconditions.checkArgument( - newSplits.size() == 1, "This pulsar split reader only support one split."); + newSplits.size() == 1, "This pulsar split reader only supports one split."); this.registeredSplit = newSplits.get(0); // Open stop cursor. @@ -184,9 +184,10 @@ abstract class PulsarPartitionSplitReaderBase<OUT> public void pauseOrResumeSplits( Collection<PulsarPartitionSplit> splitsToPause, Collection<PulsarPartitionSplit> splitsToResume) { - if (splitsToPause.size() > 1 || splitsToResume.size() > 1) { - throw new IllegalStateException("This pulsar split reader only support one split."); - } + // This shouldn't happen but just in case... + Preconditions.checkState( + splitsToPause.size() + splitsToResume.size() <= 1, + "This pulsar split reader only supports one split."); if (!splitsToPause.isEmpty()) { pulsarConsumer.pause(); diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java index 9806108..4ec1912 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java @@ -29,11 +29,13 @@ import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.Timeout; import java.time.Duration; import java.util.Collections; @@ -130,6 +132,36 @@ class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase { } } + @TestTemplate + @Timeout(600) + void supportsPausingOrResumingSplits( + PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, String topicName) + throws Exception { + final PulsarPartitionSplit split = + createPartitionSplit(topicName, 0, boundedness, MessageId.earliest); + + reader.addSplits(Collections.singletonList(split)); + + TestingReaderOutput<Integer> output = new TestingReaderOutput<>(); + + reader.pauseOrResumeSplits( + Collections.singletonList(split.splitId()), Collections.emptyList()); + + InputStatus status = reader.pollNext(output); + assertThat(status).isEqualTo(InputStatus.NOTHING_AVAILABLE); + + reader.pauseOrResumeSplits(Collections.emptyList(), Collections.singleton(split.splitId())); + + do { + status = reader.pollNext(output); + Thread.sleep(5); + } while (status != InputStatus.MORE_AVAILABLE); + + assertThat(status).isEqualTo(InputStatus.MORE_AVAILABLE); + + reader.close(); + } + private void setupSourceReader( PulsarSourceReaderBase<Integer> reader, String topicName,