This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit e94e54b5201b08ba353133571ac2bd76392708c4
Author: Qingsheng Ren <renqs...@gmail.com>
AuthorDate: Tue Jan 18 20:44:00 2022 +0800

    [FLINK-25287][connectors/pulsar] Use connector testing framework interface 
for Pulsar tests
---
 .../util/pulsar/PulsarSourceOrderedE2ECase.java    | 14 ++++----
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  | 12 +++----
 .../pulsar/cases/ExclusiveSubscriptionContext.java | 11 +++++-
 .../pulsar/cases/FailoverSubscriptionContext.java  | 11 +++++-
 .../pulsar/cases/KeySharedSubscriptionContext.java | 28 +++++++++++----
 .../pulsar/cases/SharedSubscriptionContext.java    | 28 +++++++++++----
 .../common/KeyedPulsarPartitionDataWriter.java     |  7 ++--
 .../common/UnorderedSourceTestSuiteBase.java       | 40 +++++++++++++++-------
 8 files changed, 107 insertions(+), 44 deletions(-)

diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
index 9a499e6..8641f50 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -20,10 +20,10 @@ package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
-import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
-import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
-import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
 import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
@@ -41,16 +41,16 @@ public class PulsarSourceOrderedE2ECase extends 
SourceTestSuiteBase<String> {
     FlinkContainerWithPulsarEnvironment flink = new 
FlinkContainerWithPulsarEnvironment(1, 6);
 
     // Defines ConnectorExternalSystem.
-    @ExternalSystem
+    @TestExternalSystem
     PulsarTestEnvironment pulsar =
             new 
PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager()));
 
     // Defines a set of external context Factories for different test cases.
-    @ExternalContextFactory
+    @TestContext
     PulsarTestContextFactory<String, ExclusiveSubscriptionContext> exclusive =
             new PulsarTestContextFactory<>(pulsar, 
ExclusiveSubscriptionContext::new);
 
-    @ExternalContextFactory
+    @TestContext
     PulsarTestContextFactory<String, FailoverSubscriptionContext> failover =
             new PulsarTestContextFactory<>(pulsar, 
FailoverSubscriptionContext::new);
 }
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index 797c7b1..e519618 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -20,9 +20,9 @@ package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
-import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
-import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
 import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
 import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
@@ -41,16 +41,16 @@ public class PulsarSourceUnorderedE2ECase extends 
UnorderedSourceTestSuiteBase<S
     FlinkContainerWithPulsarEnvironment flink = new 
FlinkContainerWithPulsarEnvironment(1, 8);
 
     // Defines ConnectorExternalSystem.
-    @ExternalSystem
+    @TestExternalSystem
     PulsarTestEnvironment pulsar =
             new 
PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager()));
 
     // Defines a set of external context Factories for different test cases.
-    @ExternalContextFactory
+    @TestContext
     PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
             new PulsarTestContextFactory<>(pulsar, 
SharedSubscriptionContext::new);
 
-    @ExternalContextFactory
+    @TestContext
     PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
             new PulsarTestContextFactory<>(pulsar, 
KeySharedSubscriptionContext::new);
 }
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
index 18b2ffc..c3c4959 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
@@ -23,6 +23,10 @@ import 
org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateCo
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
 import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
 
@@ -31,7 +35,12 @@ public class ExclusiveSubscriptionContext extends 
MultipleTopicTemplateContext {
     private static final long serialVersionUID = 6238209089442257487L;
 
     public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment);
+        this(environment, Collections.emptyList());
+    }
+
+    public ExclusiveSubscriptionContext(
+            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
+        super(environment, connectorJarPaths);
     }
 
     @Override
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
index c322efa..9dbbec8 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
@@ -23,6 +23,10 @@ import 
org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateCo
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
 import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
 
@@ -31,7 +35,12 @@ public class FailoverSubscriptionContext extends 
MultipleTopicTemplateContext {
     private static final long serialVersionUID = 6238209089442257487L;
 
     public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment);
+        this(environment, Collections.emptyList());
+    }
+
+    public FailoverSubscriptionContext(
+            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
+        super(environment, connectorJarPaths);
     }
 
     @Override
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
index 5d937ba..0be3ac3 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.tests.util.pulsar.cases;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.connector.pulsar.source.PulsarSource;
@@ -28,14 +29,17 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import 
org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+import 
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
 import 
org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
 
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 
+import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static java.util.Collections.singletonList;
@@ -61,7 +65,12 @@ public class KeySharedSubscriptionContext extends 
PulsarTestContext<String> {
     private final String key2;
 
     public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment);
+        this(environment, Collections.emptyList());
+    }
+
+    public KeySharedSubscriptionContext(
+            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
+        super(environment, connectorJarPaths);
 
         // Init message keys.
         this.key1 = randomAlphabetic(8);
@@ -78,7 +87,7 @@ public class KeySharedSubscriptionContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public Source<String, ?, ?> createSource(Boundedness boundedness) {
+    public Source<String, ?, ?> createSource(TestingSourceSettings 
sourceSettings) {
         int keyHash = keyHash(key1);
         TopicRange range = new TopicRange(keyHash, keyHash);
 
@@ -92,7 +101,7 @@ public class KeySharedSubscriptionContext extends 
PulsarTestContext<String> {
                         .setSubscriptionType(SubscriptionType.Key_Shared)
                         .setSubscriptionName("pulsar-key-shared")
                         .setRangeGenerator(new 
FixedRangeGenerator(singletonList(range)));
-        if (boundedness == Boundedness.BOUNDED) {
+        if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
             // Using latest stop cursor for making sure the source could be 
stopped.
             builder.setBoundedStopCursor(StopCursor.latest());
         }
@@ -101,7 +110,8 @@ public class KeySharedSubscriptionContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
+    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
+            TestingSourceSettings sourceSettings) {
         String topicName = "pulsar-" + index + "-key-shared";
         operator.createTopic(topicName, 1);
         index++;
@@ -115,10 +125,16 @@ public class KeySharedSubscriptionContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public List<String> generateTestData(int splitIndex, long seed) {
+    public List<String> generateTestData(
+            TestingSourceSettings sourceSettings, int splitIndex, long seed) {
         return generateStringTestData(splitIndex, seed);
     }
 
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return TypeInformation.of(String.class);
+    }
+
     @Override
     public void close() {
         for (KeyedPulsarPartitionDataWriter writer : writers) {
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
index 52e30b3..5a4ce75 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.tests.util.pulsar.cases;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.connector.pulsar.source.PulsarSource;
@@ -27,12 +28,15 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import 
org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+import 
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
 
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 
+import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
@@ -49,7 +53,12 @@ public class SharedSubscriptionContext extends 
PulsarTestContext<String> {
     private final List<PulsarPartitionDataWriter> writers = new ArrayList<>();
 
     public SharedSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment);
+        this(environment, Collections.emptyList());
+    }
+
+    public SharedSubscriptionContext(
+            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
+        super(environment, connectorJarPaths);
     }
 
     @Override
@@ -58,7 +67,7 @@ public class SharedSubscriptionContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public Source<String, ?, ?> createSource(Boundedness boundedness) {
+    public Source<String, ?, ?> createSource(TestingSourceSettings 
sourceSettings) {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
                         .setDeserializationSchema(pulsarSchema(STRING))
@@ -67,7 +76,7 @@ public class SharedSubscriptionContext extends 
PulsarTestContext<String> {
                         .setTopicPattern("pulsar-[0-9]+-shared", 
RegexSubscriptionMode.AllTopics)
                         .setSubscriptionType(SubscriptionType.Shared)
                         .setSubscriptionName("pulsar-shared");
-        if (boundedness == Boundedness.BOUNDED) {
+        if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
             // Using latest stop cursor for making sure the source could be 
stopped.
             builder.setBoundedStopCursor(StopCursor.latest());
         }
@@ -76,7 +85,8 @@ public class SharedSubscriptionContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
+    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
+            TestingSourceSettings sourceSettings) {
         String topicName = "pulsar-" + index + "-shared";
         operator.createTopic(topicName, 1);
         index++;
@@ -89,10 +99,16 @@ public class SharedSubscriptionContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public List<String> generateTestData(int splitIndex, long seed) {
+    public List<String> generateTestData(
+            TestingSourceSettings sourceSettings, int splitIndex, long seed) {
         return generateStringTestData(splitIndex, seed);
     }
 
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return TypeInformation.of(String.class);
+    }
+
     @Override
     public void close() {
         for (PulsarPartitionDataWriter writer : writers) {
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
index eea97e6..e431e4c 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
@@ -19,11 +19,10 @@
 package org.apache.flink.tests.util.pulsar.common;
 
 import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import 
org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
 
 import org.apache.pulsar.client.api.Schema;
 
-import java.util.Collection;
 import java.util.List;
 
 import static java.util.stream.Collectors.toList;
@@ -32,7 +31,7 @@ import static java.util.stream.Collectors.toList;
  * Source split data writer for writing test data into a Pulsar topic 
partition. It will write the
  * message with two keys.
  */
-public class KeyedPulsarPartitionDataWriter implements 
SourceSplitDataWriter<String> {
+public class KeyedPulsarPartitionDataWriter implements 
ExternalSystemSplitDataWriter<String> {
 
     private final PulsarRuntimeOperator operator;
     private final String fullTopicName;
@@ -48,7 +47,7 @@ public class KeyedPulsarPartitionDataWriter implements 
SourceSplitDataWriter<Str
     }
 
     @Override
-    public void writeRecords(Collection<String> records) {
+    public void writeRecords(List<String> records) {
         operator.sendMessages(fullTopicName, Schema.STRING, key1, records);
 
         List<String> newRecords = records.stream().map(a -> a + 
key1).collect(toList());
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
index c452fe6..01527ea 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
@@ -21,20 +21,22 @@ package org.apache.flink.tests.util.pulsar.common;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connectors.test.common.environment.TestEnvironment;
-import org.apache.flink.connectors.test.common.external.ExternalContext;
-import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
-import 
org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension;
-import 
org.apache.flink.connectors.test.common.junit.extensions.TestCaseInvocationContextProvider;
-import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import 
org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import 
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
+import 
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import 
org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -53,14 +55,26 @@ public abstract class UnorderedSourceTestSuiteBase<T> {
     @TestTemplate
     @DisplayName("Test source with one split and four consumers")
     public void testOneSplitWithMultipleConsumers(
-            TestEnvironment testEnv, ExternalContext<T> externalContext) 
throws Exception {
-        Collection<T> testData =
-                externalContext.generateTestData(0, 
ThreadLocalRandom.current().nextLong());
-        SourceSplitDataWriter<T> writer = 
externalContext.createSourceSplitDataWriter();
+            TestEnvironment testEnv, DataStreamSourceExternalContext<T> 
externalContext)
+            throws Exception {
+        TestingSourceSettings sourceSettings =
+                TestingSourceSettings.builder()
+                        .setBoundedness(Boundedness.BOUNDED)
+                        .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+                        .build();
+        TestEnvironmentSettings envOptions =
+                TestEnvironmentSettings.builder()
+                        
.setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                        .build();
+        List<T> testData =
+                externalContext.generateTestData(
+                        sourceSettings, 0, 
ThreadLocalRandom.current().nextLong());
+        ExternalSystemSplitDataWriter<T> writer =
+                externalContext.createSourceSplitDataWriter(sourceSettings);
         writer.writeRecords(testData);
 
-        Source<T, ?, ?> source = 
externalContext.createSource(Boundedness.BOUNDED);
-        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        Source<T, ?, ?> source = externalContext.createSource(sourceSettings);
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment(envOptions);
         List<T> results =
                 execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), 
"Pulsar source")
                         .setParallelism(4)

Reply via email to