This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit e94e54b5201b08ba353133571ac2bd76392708c4 Author: Qingsheng Ren <renqs...@gmail.com> AuthorDate: Tue Jan 18 20:44:00 2022 +0800 [FLINK-25287][connectors/pulsar] Use connector testing framework interface for Pulsar tests --- .../util/pulsar/PulsarSourceOrderedE2ECase.java | 14 ++++---- .../util/pulsar/PulsarSourceUnorderedE2ECase.java | 12 +++---- .../pulsar/cases/ExclusiveSubscriptionContext.java | 11 +++++- .../pulsar/cases/FailoverSubscriptionContext.java | 11 +++++- .../pulsar/cases/KeySharedSubscriptionContext.java | 28 +++++++++++---- .../pulsar/cases/SharedSubscriptionContext.java | 28 +++++++++++---- .../common/KeyedPulsarPartitionDataWriter.java | 7 ++-- .../common/UnorderedSourceTestSuiteBase.java | 40 +++++++++++++++------- 8 files changed, 107 insertions(+), 44 deletions(-) diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java index 9a499e6..8641f50 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java @@ -20,10 +20,10 @@ package org.apache.flink.tests.util.pulsar; import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory; -import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem; -import org.apache.flink.connectors.test.common.junit.annotations.TestEnv; -import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext; import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; @@ -41,16 +41,16 @@ public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> { FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6); // Defines ConnectorExternalSystem. - @ExternalSystem + @TestExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager())); // Defines a set of external context Factories for different test cases. - @ExternalContextFactory + @TestContext PulsarTestContextFactory<String, ExclusiveSubscriptionContext> exclusive = new PulsarTestContextFactory<>(pulsar, ExclusiveSubscriptionContext::new); - @ExternalContextFactory + @TestContext PulsarTestContextFactory<String, FailoverSubscriptionContext> failover = new PulsarTestContextFactory<>(pulsar, FailoverSubscriptionContext::new); } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java index 797c7b1..e519618 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java @@ -20,9 +20,9 @@ package org.apache.flink.tests.util.pulsar; import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory; -import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem; -import org.apache.flink.connectors.test.common.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext; import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; @@ -41,16 +41,16 @@ public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<S FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 8); // Defines ConnectorExternalSystem. - @ExternalSystem + @TestExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager())); // Defines a set of external context Factories for different test cases. - @ExternalContextFactory + @TestContext PulsarTestContextFactory<String, SharedSubscriptionContext> shared = new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new); - @ExternalContextFactory + @TestContext PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared = new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new); } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java index 18b2ffc..c3c4959 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java @@ -23,6 +23,10 @@ import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateCo import org.apache.pulsar.client.api.SubscriptionType; +import java.net.URL; +import java.util.Collections; +import java.util.List; + import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL; import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL; @@ -31,7 +35,12 @@ public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext { private static final long serialVersionUID = 6238209089442257487L; public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) { - super(environment); + this(environment, Collections.emptyList()); + } + + public ExclusiveSubscriptionContext( + PulsarTestEnvironment environment, List<URL> connectorJarPaths) { + super(environment, connectorJarPaths); } @Override diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java index c322efa..9dbbec8 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java @@ -23,6 +23,10 @@ import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateCo import org.apache.pulsar.client.api.SubscriptionType; +import java.net.URL; +import java.util.Collections; +import java.util.List; + import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL; import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL; @@ -31,7 +35,12 @@ public class FailoverSubscriptionContext extends MultipleTopicTemplateContext { private static final long serialVersionUID = 6238209089442257487L; public FailoverSubscriptionContext(PulsarTestEnvironment environment) { - super(environment); + this(environment, Collections.emptyList()); + } + + public FailoverSubscriptionContext( + PulsarTestEnvironment environment, List<URL> connectorJarPaths) { + super(environment, connectorJarPaths); } @Override diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java index 5d937ba..0be3ac3 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java @@ -18,6 +18,7 @@ package org.apache.flink.tests.util.pulsar.cases; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.connector.pulsar.source.PulsarSource; @@ -28,14 +29,17 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator; import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; +import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; +import org.apache.flink.connector.testframe.external.source.TestingSourceSettings; import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.util.Murmur3_32Hash; +import java.net.URL; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static java.util.Collections.singletonList; @@ -61,7 +65,12 @@ public class KeySharedSubscriptionContext extends PulsarTestContext<String> { private final String key2; public KeySharedSubscriptionContext(PulsarTestEnvironment environment) { - super(environment); + this(environment, Collections.emptyList()); + } + + public KeySharedSubscriptionContext( + PulsarTestEnvironment environment, List<URL> connectorJarPaths) { + super(environment, connectorJarPaths); // Init message keys. this.key1 = randomAlphabetic(8); @@ -78,7 +87,7 @@ public class KeySharedSubscriptionContext extends PulsarTestContext<String> { } @Override - public Source<String, ?, ?> createSource(Boundedness boundedness) { + public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) { int keyHash = keyHash(key1); TopicRange range = new TopicRange(keyHash, keyHash); @@ -92,7 +101,7 @@ public class KeySharedSubscriptionContext extends PulsarTestContext<String> { .setSubscriptionType(SubscriptionType.Key_Shared) .setSubscriptionName("pulsar-key-shared") .setRangeGenerator(new FixedRangeGenerator(singletonList(range))); - if (boundedness == Boundedness.BOUNDED) { + if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) { // Using latest stop cursor for making sure the source could be stopped. builder.setBoundedStopCursor(StopCursor.latest()); } @@ -101,7 +110,8 @@ public class KeySharedSubscriptionContext extends PulsarTestContext<String> { } @Override - public SourceSplitDataWriter<String> createSourceSplitDataWriter() { + public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter( + TestingSourceSettings sourceSettings) { String topicName = "pulsar-" + index + "-key-shared"; operator.createTopic(topicName, 1); index++; @@ -115,10 +125,16 @@ public class KeySharedSubscriptionContext extends PulsarTestContext<String> { } @Override - public List<String> generateTestData(int splitIndex, long seed) { + public List<String> generateTestData( + TestingSourceSettings sourceSettings, int splitIndex, long seed) { return generateStringTestData(splitIndex, seed); } + @Override + public TypeInformation<String> getProducedType() { + return TypeInformation.of(String.class); + } + @Override public void close() { for (KeyedPulsarPartitionDataWriter writer : writers) { diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java index 52e30b3..5a4ce75 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java @@ -18,6 +18,7 @@ package org.apache.flink.tests.util.pulsar.cases; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.connector.pulsar.source.PulsarSource; @@ -27,12 +28,15 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter; import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; +import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; +import org.apache.flink.connector.testframe.external.source.TestingSourceSettings; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import java.net.URL; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; @@ -49,7 +53,12 @@ public class SharedSubscriptionContext extends PulsarTestContext<String> { private final List<PulsarPartitionDataWriter> writers = new ArrayList<>(); public SharedSubscriptionContext(PulsarTestEnvironment environment) { - super(environment); + this(environment, Collections.emptyList()); + } + + public SharedSubscriptionContext( + PulsarTestEnvironment environment, List<URL> connectorJarPaths) { + super(environment, connectorJarPaths); } @Override @@ -58,7 +67,7 @@ public class SharedSubscriptionContext extends PulsarTestContext<String> { } @Override - public Source<String, ?, ?> createSource(Boundedness boundedness) { + public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) { PulsarSourceBuilder<String> builder = PulsarSource.builder() .setDeserializationSchema(pulsarSchema(STRING)) @@ -67,7 +76,7 @@ public class SharedSubscriptionContext extends PulsarTestContext<String> { .setTopicPattern("pulsar-[0-9]+-shared", RegexSubscriptionMode.AllTopics) .setSubscriptionType(SubscriptionType.Shared) .setSubscriptionName("pulsar-shared"); - if (boundedness == Boundedness.BOUNDED) { + if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) { // Using latest stop cursor for making sure the source could be stopped. builder.setBoundedStopCursor(StopCursor.latest()); } @@ -76,7 +85,8 @@ public class SharedSubscriptionContext extends PulsarTestContext<String> { } @Override - public SourceSplitDataWriter<String> createSourceSplitDataWriter() { + public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter( + TestingSourceSettings sourceSettings) { String topicName = "pulsar-" + index + "-shared"; operator.createTopic(topicName, 1); index++; @@ -89,10 +99,16 @@ public class SharedSubscriptionContext extends PulsarTestContext<String> { } @Override - public List<String> generateTestData(int splitIndex, long seed) { + public List<String> generateTestData( + TestingSourceSettings sourceSettings, int splitIndex, long seed) { return generateStringTestData(splitIndex, seed); } + @Override + public TypeInformation<String> getProducedType() { + return TypeInformation.of(String.class); + } + @Override public void close() { for (PulsarPartitionDataWriter writer : writers) { diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java index eea97e6..e431e4c 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java @@ -19,11 +19,10 @@ package org.apache.flink.tests.util.pulsar.common; import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; -import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; +import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; import org.apache.pulsar.client.api.Schema; -import java.util.Collection; import java.util.List; import static java.util.stream.Collectors.toList; @@ -32,7 +31,7 @@ import static java.util.stream.Collectors.toList; * Source split data writer for writing test data into a Pulsar topic partition. It will write the * message with two keys. */ -public class KeyedPulsarPartitionDataWriter implements SourceSplitDataWriter<String> { +public class KeyedPulsarPartitionDataWriter implements ExternalSystemSplitDataWriter<String> { private final PulsarRuntimeOperator operator; private final String fullTopicName; @@ -48,7 +47,7 @@ public class KeyedPulsarPartitionDataWriter implements SourceSplitDataWriter<Str } @Override - public void writeRecords(Collection<String> records) { + public void writeRecords(List<String> records) { operator.sendMessages(fullTopicName, Schema.STRING, key1, records); List<String> newRecords = records.stream().map(a -> a + key1).collect(toList()); diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java index c452fe6..01527ea 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java @@ -21,20 +21,22 @@ package org.apache.flink.tests.util.pulsar.common; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; -import org.apache.flink.connectors.test.common.environment.TestEnvironment; -import org.apache.flink.connectors.test.common.external.ExternalContext; -import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; -import org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension; -import org.apache.flink.connectors.test.common.junit.extensions.TestCaseInvocationContextProvider; -import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings; +import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.external.source.TestingSourceSettings; +import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension; +import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider; +import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.TestLoggerExtension; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; -import java.util.Collection; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -53,14 +55,26 @@ public abstract class UnorderedSourceTestSuiteBase<T> { @TestTemplate @DisplayName("Test source with one split and four consumers") public void testOneSplitWithMultipleConsumers( - TestEnvironment testEnv, ExternalContext<T> externalContext) throws Exception { - Collection<T> testData = - externalContext.generateTestData(0, ThreadLocalRandom.current().nextLong()); - SourceSplitDataWriter<T> writer = externalContext.createSourceSplitDataWriter(); + TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext) + throws Exception { + TestingSourceSettings sourceSettings = + TestingSourceSettings.builder() + .setBoundedness(Boundedness.BOUNDED) + .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) + .build(); + TestEnvironmentSettings envOptions = + TestEnvironmentSettings.builder() + .setConnectorJarPaths(externalContext.getConnectorJarPaths()) + .build(); + List<T> testData = + externalContext.generateTestData( + sourceSettings, 0, ThreadLocalRandom.current().nextLong()); + ExternalSystemSplitDataWriter<T> writer = + externalContext.createSourceSplitDataWriter(sourceSettings); writer.writeRecords(testData); - Source<T, ?, ?> source = externalContext.createSource(Boundedness.BOUNDED); - StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(); + Source<T, ?, ?> source = externalContext.createSource(sourceSettings); + StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions); List<T> results = execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar source") .setParallelism(4)