This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b608620  [FLINK-23969][connector/pulsar] Create e2e tests for pulsar 
connector.
b608620 is described below

commit b6086203d5fc0a08a330dd0069fbe1359ceac97a
Author: syhily <syh...@gmail.com>
AuthorDate: Mon Sep 13 21:52:31 2021 +0800

    [FLINK-23969][connector/pulsar] Create e2e tests for pulsar connector.
---
 flink-connectors/flink-connector-pulsar/pom.xml    |  47 ++++++++
 .../pulsar/source/enumerator/topic/TopicRange.java |   2 +
 .../topic/range/FixedRangeGenerator.java}          |  29 ++---
 .../pulsar/source/PulsarSourceITCase.java          |   2 +-
 .../testutils/PulsarPartitionDataWriter.java       |  28 ++---
 .../pulsar/testutils/PulsarTestContext.java        |   9 +-
 .../pulsar/testutils/PulsarTestEnvironment.java    |  19 ++--
 .../pulsar/testutils/PulsarTestSuiteBase.java      |   2 +-
 .../cases/MultipleTopicConsumingContext.java       |  83 ++------------
 ...text.java => MultipleTopicTemplateContext.java} |  46 ++++----
 .../cases/SingleTopicConsumingContext.java         |  16 ++-
 .../pulsar/testutils/runtime/PulsarRuntime.java    |  39 ++++---
 .../testutils/runtime/PulsarRuntimeOperator.java   |  21 +++-
 ...erProvider.java => PulsarContainerRuntime.java} |  28 ++++-
 ...sarMockProvider.java => PulsarMockRuntime.java} |  41 +++++--
 .../util/flink/FlinkContainerTestEnvironment.java  |  14 ++-
 .../flink-end-to-end-tests-pulsar/pom.xml          | 121 +++++++++++++++++++++
 .../util/pulsar/PulsarSourceOrderedE2ECase.java    |  40 ++++---
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |  55 ++++++++++
 .../pulsar/cases/ExclusiveSubscriptionContext.java |  61 +++++++++++
 .../pulsar/cases/FailoverSubscriptionContext.java  |  61 +++++++++++
 .../pulsar/cases/KeySharedSubscriptionContext.java |  93 ++++++++++------
 .../pulsar/cases/SharedSubscriptionContext.java    |  67 +++++-------
 .../FlinkContainerWithPulsarEnvironment.java       |  54 +++++++++
 .../common/KeyedPulsarPartitionDataWriter.java     |  62 +++++++++++
 .../common/UnorderedSourceTestSuiteBase.java       |  72 ++++++++++++
 .../src/test/resources/log4j2-test.properties      |  31 ++----
 flink-end-to-end-tests/pom.xml                     |   1 +
 .../modules-skipping-deployment.modulelist         |   1 +
 29 files changed, 846 insertions(+), 299 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/pom.xml 
b/flink-connectors/flink-connector-pulsar/pom.xml
index 9992ef9..c3e5e9a 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -169,6 +169,12 @@ under the License.
                        <groupId>org.apache.pulsar</groupId>
                        <artifactId>pulsar-client-all</artifactId>
                        <version>${pulsar.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.pulsar</groupId>
+                                       
<artifactId>pulsar-package-core</artifactId>
+                               </exclusion>
+                       </exclusions>
                </dependency>
        </dependencies>
 
@@ -258,6 +264,47 @@ under the License.
                                        </execution>
                                </executions>
                        </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <includes>
+                                                               
<include>**/testutils/**</include>
+                                                               
<include>META-INF/LICENSE</include>
+                                                               
<include>META-INF/NOTICE</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-source-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>attach-test-sources</id>
+                                               <goals>
+                                                       
<goal>test-jar-no-fork</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <archive>
+                                                               <!-- Globally 
exclude maven metadata, because it may accidentally bundle files we don't 
intend to -->
+                                                               
<addMavenDescriptor>false</addMavenDescriptor>
+                                                       </archive>
+                                                       <includes>
+                                                               
<include>**/testutils/**</include>
+                                                               
<include>META-INF/LICENSE</include>
+                                                               
<include>META-INF/NOTICE</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 </project>
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
index 1d1574b..5b77922 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
@@ -57,6 +57,8 @@ public class TopicRange implements Serializable {
     public TopicRange(int start, int end) {
         checkArgument(start >= MIN_RANGE, "Start range %s shouldn't below 
zero.", start);
         checkArgument(end <= MAX_RANGE, "End range %s shouldn't exceed 
65535.", end);
+        checkArgument(start <= end, "Range end must >= range start.");
+
         this.start = start;
         this.end = end;
     }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java
similarity index 53%
rename from 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java
rename to 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java
index d8ad718..6f82725 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java
@@ -16,22 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.testutils.runtime;
+package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
 
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 
-/**
- * A abstraction for different pulsar runtimes. Providing the common methods 
for {@link
- * PulsarTestEnvironment}.
- */
-public interface PulsarRuntimeProvider {
+import java.util.List;
+
+/** Always return the same range set for all topics. */
+public class FixedRangeGenerator implements RangeGenerator {
+    private static final long serialVersionUID = -3895203007855538734L;
 
-    /** Start up this pulsar runtime, block the thread until everytime is 
ready for this runtime. */
-    void startUp();
+    private final List<TopicRange> ranges;
 
-    /** Shutdown this pulsar runtime. */
-    void tearDown();
+    public FixedRangeGenerator(List<TopicRange> ranges) {
+        this.ranges = ranges;
+    }
 
-    /** Return a operator for operating this pulsar runtime. */
-    PulsarRuntimeOperator operator();
+    @Override
+    public List<TopicRange> range(TopicMetadata metadata, int parallelism) {
+        return ranges;
+    }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 89457dc..8cc2e0a 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -37,7 +37,7 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
     @TestEnv MiniClusterTestEnvironment flink = new 
MiniClusterTestEnvironment();
 
     // Defines pulsar running environment
-    @ExternalSystem PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.MOCK);
+    @ExternalSystem PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.mock());
 
     // Defines a external context Factories,
     // so test cases will be invoked using this external contexts.
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
index ea9c4ab..c2afee5 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
@@ -18,41 +18,31 @@
 
 package org.apache.flink.connector.pulsar.testutils;
 
-import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
 import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
 
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 
 import java.util.Collection;
 
-import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
-
 /** Source split data writer for writing test data into a Pulsar topic 
partition. */
 public class PulsarPartitionDataWriter implements 
SourceSplitDataWriter<String> {
 
-    private final Producer<String> producer;
+    private final PulsarRuntimeOperator operator;
+    private final String fullTopicName;
 
-    public PulsarPartitionDataWriter(PulsarClient client, TopicPartition 
partition) {
-        try {
-            this.producer =
-                    
client.newProducer(Schema.STRING).topic(partition.getFullTopicName()).create();
-        } catch (PulsarClientException e) {
-            throw new IllegalStateException(e);
-        }
+    public PulsarPartitionDataWriter(PulsarRuntimeOperator operator, String 
fullTopicName) {
+        this.operator = operator;
+        this.fullTopicName = fullTopicName;
     }
 
     @Override
     public void writeRecords(Collection<String> records) {
-        for (String record : records) {
-            sneakyClient(() -> producer.newMessage().value(record).send());
-        }
+        operator.sendMessages(fullTopicName, Schema.STRING, records);
     }
 
     @Override
-    public void close() throws Exception {
-        producer.close();
+    public void close() {
+        // Nothing to do.
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index a80d721..2ad4c2f 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -29,15 +29,14 @@ import static 
org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 
 /** Common test context for pulsar based test. */
 public abstract class PulsarTestContext<T> implements ExternalContext<T> {
+    private static final long serialVersionUID = 4717940854368532130L;
 
     private static final int NUM_RECORDS_UPPER_BOUND = 500;
     private static final int NUM_RECORDS_LOWER_BOUND = 100;
 
-    private final String displayName;
     protected final PulsarRuntimeOperator operator;
 
-    protected PulsarTestContext(String displayName, PulsarTestEnvironment 
environment) {
-        this.displayName = displayName;
+    protected PulsarTestContext(PulsarTestEnvironment environment) {
         this.operator = environment.operator();
     }
 
@@ -58,8 +57,10 @@ public abstract class PulsarTestContext<T> implements 
ExternalContext<T> {
         return records;
     }
 
+    protected abstract String displayName();
+
     @Override
     public String toString() {
-        return displayName;
+        return displayName();
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
index 7b31c7c..50ca3fe 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.pulsar.testutils;
 
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
 import org.apache.flink.connectors.test.common.TestResource;
 import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
 
@@ -54,10 +53,10 @@ import java.util.List;
 public class PulsarTestEnvironment
         implements BeforeAllCallback, AfterAllCallback, TestResource, TestRule 
{
 
-    private final PulsarRuntimeProvider provider;
+    private final PulsarRuntime runtime;
 
     public PulsarTestEnvironment(PulsarRuntime runtime) {
-        this.provider = runtime.provider();
+        this.runtime = runtime;
     }
 
     /** JUnit 4 Rule based test logic. */
@@ -66,7 +65,7 @@ public class PulsarTestEnvironment
         return new Statement() {
             @Override
             public void evaluate() throws Throwable {
-                provider.startUp();
+                runtime.startUp();
 
                 List<Throwable> errors = new ArrayList<>();
                 try {
@@ -75,7 +74,7 @@ public class PulsarTestEnvironment
                     errors.add(t);
                 } finally {
                     try {
-                        provider.tearDown();
+                        runtime.tearDown();
                     } catch (Throwable t) {
                         errors.add(t);
                     }
@@ -88,29 +87,29 @@ public class PulsarTestEnvironment
     /** JUnit 5 Extension setup method. */
     @Override
     public void beforeAll(ExtensionContext context) {
-        provider.startUp();
+        runtime.startUp();
     }
 
     /** flink-connector-testing setup method. */
     @Override
     public void startUp() {
-        provider.startUp();
+        runtime.startUp();
     }
 
     /** JUnit 5 Extension shutdown method. */
     @Override
     public void afterAll(ExtensionContext context) {
-        provider.tearDown();
+        runtime.tearDown();
     }
 
     /** flink-connector-testing shutdown method. */
     @Override
     public void tearDown() {
-        provider.tearDown();
+        runtime.tearDown();
     }
 
     /** Get a common supported set of method for operating pulsar which is in 
container. */
     public PulsarRuntimeOperator operator() {
-        return provider.operator();
+        return runtime.operator();
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
index 2321bd4..18a8655 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
@@ -56,7 +56,7 @@ public abstract class PulsarTestSuiteBase {
      * pulsar broker. Override this method when needs.
      */
     protected PulsarRuntime runtime() {
-        return PulsarRuntime.MOCK;
+        return PulsarRuntime.mock();
     }
 
     /** Operate pulsar by acquiring a runtime operator. */
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
index 7ce676c..12dbabe 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
@@ -18,96 +18,33 @@
 
 package org.apache.flink.connector.pulsar.testutils.cases;
 
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
 
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
-import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import org.apache.pulsar.client.api.SubscriptionType;
 
 /**
  * Pulsar external context that will create multiple topics with only one 
partitions as source
  * splits.
  */
-public class MultipleTopicConsumingContext extends PulsarTestContext<String> {
-
-    private int numTopics = 0;
-
-    private final String topicPattern;
-
-    private final Map<String, SourceSplitDataWriter<String>> 
topicNameToSplitWriters =
-            new HashMap<>();
+public class MultipleTopicConsumingContext extends 
MultipleTopicTemplateContext {
+    private static final long serialVersionUID = -3855336888090886528L;
 
     public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
-        super("consuming message on multiple topic", environment);
-        this.topicPattern =
-                "pulsar-multiple-topic-[0-9]+-"
-                        + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        super(environment);
     }
 
     @Override
-    public Source<String, ?, ?> createSource(Boundedness boundedness) {
-        PulsarSourceBuilder<String> builder =
-                PulsarSource.builder()
-                        .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
-                        .setTopicPattern(topicPattern, 
RegexSubscriptionMode.AllTopics)
-                        .setSubscriptionType(Exclusive)
-                        
.setSubscriptionName("flink-pulsar-multiple-topic-test");
-        if (boundedness == Boundedness.BOUNDED) {
-            // Using latest stop cursor for making sure the source could be 
stopped.
-            // This is required for SourceTestSuiteBase.
-            builder.setBoundedStopCursor(StopCursor.latest());
-        }
-
-        return builder.build();
+    protected String displayName() {
+        return "consuming message on multiple topic";
     }
 
     @Override
-    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
-        String topicName = topicPattern.replace("[0-9]+", 
String.valueOf(numTopics));
-        operator.createTopic(topicName, 1);
-
-        String partitionName = 
TopicNameUtils.topicNameWithPartition(topicName, 0);
-        TopicPartition partition = new TopicPartition(partitionName, 0, 
createFullRange());
-        PulsarPartitionDataWriter writer =
-                new PulsarPartitionDataWriter(operator.client(), partition);
-
-        topicNameToSplitWriters.put(partitionName, writer);
-        numTopics++;
-
-        return writer;
-    }
-
-    @Override
-    public Collection<String> generateTestData(int splitIndex, long seed) {
-        return generateStringTestData(splitIndex, seed);
+    protected String subscriptionName() {
+        return "flink-pulsar-multiple-topic-test";
     }
 
     @Override
-    public void close() throws Exception {
-        for (SourceSplitDataWriter<String> writer : 
topicNameToSplitWriters.values()) {
-            writer.close();
-        }
-
-        topicNameToSplitWriters.clear();
+    protected SubscriptionType subscriptionType() {
+        return SubscriptionType.Exclusive;
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
similarity index 72%
copy from 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
copy to 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
index 7ce676c..a0801ec 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
@@ -24,42 +24,38 @@ import 
org.apache.flink.connector.pulsar.source.PulsarSource;
 import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
 
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
 
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
 
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
 import static org.apache.pulsar.client.api.Schema.STRING;
-import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
 
 /**
- * Pulsar external context that will create multiple topics with only one 
partitions as source
- * splits.
+ * Pulsar external context template that will create multiple topics with only 
one partitions as
+ * source splits.
  */
-public class MultipleTopicConsumingContext extends PulsarTestContext<String> {
+public abstract class MultipleTopicTemplateContext extends 
PulsarTestContext<String> {
+    private static final long serialVersionUID = 7333807392445848344L;
 
     private int numTopics = 0;
 
-    private final String topicPattern;
+    private final String topicPattern = "pulsar-multiple-topic-[0-9]+-" + 
randomAlphabetic(8);
 
     private final Map<String, SourceSplitDataWriter<String>> 
topicNameToSplitWriters =
             new HashMap<>();
 
-    public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
-        super("consuming message on multiple topic", environment);
-        this.topicPattern =
-                "pulsar-multiple-topic-[0-9]+-"
-                        + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    public MultipleTopicTemplateContext(PulsarTestEnvironment environment) {
+        super(environment);
     }
 
     @Override
@@ -67,11 +63,11 @@ public class MultipleTopicConsumingContext extends 
PulsarTestContext<String> {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
                         .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
+                        .setServiceUrl(serviceUrl())
+                        .setAdminUrl(adminUrl())
                         .setTopicPattern(topicPattern, 
RegexSubscriptionMode.AllTopics)
-                        .setSubscriptionType(Exclusive)
-                        
.setSubscriptionName("flink-pulsar-multiple-topic-test");
+                        .setSubscriptionType(subscriptionType())
+                        .setSubscriptionName(subscriptionName());
         if (boundedness == Boundedness.BOUNDED) {
             // Using latest stop cursor for making sure the source could be 
stopped.
             // This is required for SourceTestSuiteBase.
@@ -87,9 +83,7 @@ public class MultipleTopicConsumingContext extends 
PulsarTestContext<String> {
         operator.createTopic(topicName, 1);
 
         String partitionName = 
TopicNameUtils.topicNameWithPartition(topicName, 0);
-        TopicPartition partition = new TopicPartition(partitionName, 0, 
createFullRange());
-        PulsarPartitionDataWriter writer =
-                new PulsarPartitionDataWriter(operator.client(), partition);
+        PulsarPartitionDataWriter writer = new 
PulsarPartitionDataWriter(operator, partitionName);
 
         topicNameToSplitWriters.put(partitionName, writer);
         numTopics++;
@@ -110,4 +104,16 @@ public class MultipleTopicConsumingContext extends 
PulsarTestContext<String> {
 
         topicNameToSplitWriters.clear();
     }
+
+    protected abstract String subscriptionName();
+
+    protected abstract SubscriptionType subscriptionType();
+
+    protected String serviceUrl() {
+        return operator.serviceUrl();
+    }
+
+    protected String adminUrl() {
+        return operator.adminUrl();
+    }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
index cb1b582..b89511c 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.connector.pulsar.source.PulsarSource;
 import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
@@ -34,7 +34,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
 import static org.apache.pulsar.client.api.Schema.STRING;
 import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
@@ -44,6 +43,7 @@ import static 
org.apache.pulsar.client.api.SubscriptionType.Exclusive;
  * source splits.
  */
 public class SingleTopicConsumingContext extends PulsarTestContext<String> {
+    private static final long serialVersionUID = 2754642285356345741L;
 
     private static final String TOPIC_NAME_PREFIX = "pulsar-single-topic";
     private final String topicName;
@@ -53,12 +53,17 @@ public class SingleTopicConsumingContext extends 
PulsarTestContext<String> {
     private int numSplits = 0;
 
     public SingleTopicConsumingContext(PulsarTestEnvironment environment) {
-        super("consuming message on single topic", environment);
+        super(environment);
         this.topicName =
                 TOPIC_NAME_PREFIX + "-" + 
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
     }
 
     @Override
+    protected String displayName() {
+        return "consuming message on single topic";
+    }
+
+    @Override
     public Source<String, ?, ?> createSource(Boundedness boundedness) {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
@@ -88,9 +93,8 @@ public class SingleTopicConsumingContext extends 
PulsarTestContext<String> {
             operator.increaseTopicPartitions(topicName, numSplits);
         }
 
-        TopicPartition partition = new TopicPartition(topicName, numSplits - 
1, createFullRange());
-        PulsarPartitionDataWriter writer =
-                new PulsarPartitionDataWriter(operator.client(), partition);
+        String partitionName = 
TopicNameUtils.topicNameWithPartition(topicName, numSplits - 1);
+        PulsarPartitionDataWriter writer = new 
PulsarPartitionDataWriter(operator, partitionName);
         partitionToSplitWriter.put(numSplits - 1, writer);
 
         return writer;
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
index 986f4bd..d46658e 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
@@ -18,33 +18,36 @@
 
 package org.apache.flink.connector.pulsar.testutils.runtime;
 
-import 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerProvider;
-import 
org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockProvider;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime;
+import 
org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockRuntime;
 
-import java.util.function.Supplier;
+import org.testcontainers.containers.GenericContainer;
 
 /**
- * A enum class for providing a operable pulsar runtime. We support two types 
of runtime, the
- * container and mock.
+ * An abstraction for different pulsar runtimes. Providing the common methods 
for {@link
+ * PulsarTestEnvironment}.
  */
-public enum PulsarRuntime {
+public interface PulsarRuntime {
 
-    /**
-     * The whole pulsar cluster would run in a docker container, provide the 
full fledged test
-     * backend.
-     */
-    CONTAINER(PulsarContainerProvider::new),
+    /** Start up this pulsar runtime, block the thread until everytime is 
ready for this runtime. */
+    void startUp();
 
-    /** The bookkeeper and zookeeper would use a mock backend, and start a 
single pulsar broker. */
-    MOCK(PulsarMockProvider::new);
+    /** Shutdown this pulsar runtime. */
+    void tearDown();
 
-    private final Supplier<PulsarRuntimeProvider> provider;
+    /** Return a operator for operating this pulsar runtime. */
+    PulsarRuntimeOperator operator();
 
-    PulsarRuntime(Supplier<PulsarRuntimeProvider> provider) {
-        this.provider = provider;
+    static PulsarRuntime mock() {
+        return new PulsarMockRuntime();
     }
 
-    public PulsarRuntimeProvider provider() {
-        return provider.get();
+    static PulsarRuntime container() {
+        return new PulsarContainerRuntime();
+    }
+
+    static PulsarRuntime container(GenericContainer<?> flinkContainer) {
+        return new 
PulsarContainerRuntime().bindWithFlinkContainer(flinkContainer);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index a68c065..2d26925 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connectors.test.common.external.ExternalContext;
 
+import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
+
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
@@ -148,13 +150,30 @@ public class PulsarRuntimeOperator implements 
Serializable, Closeable {
         return messageIds.get(0);
     }
 
+    public <T> MessageId sendMessage(String topic, Schema<T> schema, String 
key, T message) {
+        List<MessageId> messageIds = sendMessages(topic, schema, key, 
singletonList(message));
+        checkArgument(messageIds.size() == 1);
+
+        return messageIds.get(0);
+    }
+
     public <T> List<MessageId> sendMessages(
             String topic, Schema<T> schema, Collection<T> messages) {
+        return sendMessages(topic, schema, null, messages);
+    }
+
+    public <T> List<MessageId> sendMessages(
+            String topic, Schema<T> schema, String key, Collection<T> 
messages) {
         try (Producer<T> producer = 
client().newProducer(schema).topic(topic).create()) {
             List<MessageId> messageIds = new ArrayList<>(messages.size());
 
             for (T message : messages) {
-                MessageId messageId = 
producer.newMessage().value(message).send();
+                MessageId messageId;
+                if (Strings.isNullOrEmpty(key)) {
+                    messageId = producer.newMessage().value(message).send();
+                } else {
+                    messageId = 
producer.newMessage().key(key).value(message).send();
+                }
                 messageIds.add(messageId);
             }
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
similarity index 75%
rename from 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java
rename to 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index 06be3cb..5560767 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.connector.pulsar.testutils.runtime.container;
 
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
 import org.apache.flink.util.DockerImageVersions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PulsarContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
@@ -36,13 +37,22 @@ import java.time.Duration;
 import static org.apache.flink.util.DockerImageVersions.PULSAR;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.testcontainers.containers.PulsarContainer.BROKER_HTTP_PORT;
+import static org.testcontainers.containers.PulsarContainer.BROKER_PORT;
 
 /**
- * {@link PulsarRuntimeProvider} implementation, use the TestContainers as the 
backend. We would
- * start a pulsar container by this provider.
+ * {@link PulsarRuntime} implementation, use the TestContainers as the 
backend. We would start a
+ * pulsar container by this provider.
  */
-public class PulsarContainerProvider implements PulsarRuntimeProvider {
-    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarContainerProvider.class);
+public class PulsarContainerRuntime implements PulsarRuntime {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarContainerRuntime.class);
+    private static final String PULSAR_INTERNAL_HOSTNAME = "pulsar";
+
+    // This url is used on the container side.
+    public static final String PULSAR_SERVICE_URL =
+            String.format("pulsar://%s:%d", PULSAR_INTERNAL_HOSTNAME, 
BROKER_PORT);
+    // This url is used on the container side.
+    public static final String PULSAR_ADMIN_URL =
+            String.format("http://%s:%d";, PULSAR_INTERNAL_HOSTNAME, 
BROKER_HTTP_PORT);
 
     /**
      * Create a pulsar container provider by a predefined version, this 
constance {@link
@@ -52,6 +62,14 @@ public class PulsarContainerProvider implements 
PulsarRuntimeProvider {
 
     private PulsarRuntimeOperator operator;
 
+    public PulsarContainerRuntime bindWithFlinkContainer(GenericContainer<?> 
flinkContainer) {
+        this.container
+                .withNetworkAliases(PULSAR_INTERNAL_HOSTNAME)
+                .dependsOn(flinkContainer)
+                .withNetwork(flinkContainer.getNetwork());
+        return this;
+    }
+
     @Override
     public void startUp() {
         // Prepare Pulsar Container.
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
similarity index 76%
rename from 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java
rename to 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
index 1cb8ce9..552ce42 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.pulsar.testutils.runtime.mock;
 
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
 
@@ -39,17 +39,17 @@ import static 
org.apache.flink.connector.pulsar.testutils.runtime.mock.PortBindi
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Providing a mocked pulsar server. */
-public class PulsarMockProvider implements PulsarRuntimeProvider {
+public class PulsarMockRuntime implements PulsarRuntime {
 
     private static final String CLUSTER_NAME = "mock-pulsar-" + 
randomAlphanumeric(6);
     private final MockPulsarService pulsarService;
     private PulsarRuntimeOperator operator;
 
-    public PulsarMockProvider() {
+    public PulsarMockRuntime() {
         this(createConfig());
     }
 
-    public PulsarMockProvider(ServiceConfiguration configuration) {
+    public PulsarMockRuntime(ServiceConfiguration configuration) {
         this.pulsarService = new MockPulsarService(configuration);
     }
 
@@ -95,20 +95,34 @@ public class PulsarMockProvider implements 
PulsarRuntimeProvider {
             admin.clusters().createCluster(CLUSTER_NAME, data);
         }
 
+        createOrUpdateTenant("public");
+        createOrUpdateNamespace("public", "default");
+
+        createOrUpdateTenant("pulsar");
+        createOrUpdateNamespace("pulsar", "system");
+    }
+
+    private void createOrUpdateTenant(String tenant) throws 
PulsarAdminException {
+        PulsarAdmin admin = operator().admin();
         TenantInfo info =
                 TenantInfo.builder()
                         .adminRoles(ImmutableSet.of("appid1", "appid2"))
                         .allowedClusters(ImmutableSet.of(CLUSTER_NAME))
                         .build();
-        if (!admin.tenants().getTenants().contains("public")) {
-            admin.tenants().createTenant("public", info);
+        if (!admin.tenants().getTenants().contains(tenant)) {
+            admin.tenants().createTenant(tenant, info);
         } else {
-            admin.tenants().updateTenant("public", info);
+            admin.tenants().updateTenant(tenant, info);
         }
+    }
 
-        if 
(!admin.namespaces().getNamespaces("public").contains("public/default")) {
-            admin.namespaces().createNamespace("public/default");
-            admin.namespaces().setRetention("public/default", new 
RetentionPolicies(60, 1000));
+    public void createOrUpdateNamespace(String tenant, String namespace)
+            throws PulsarAdminException {
+        PulsarAdmin admin = operator().admin();
+        String namespaceValue = tenant + "/" + namespace;
+        if 
(!admin.namespaces().getNamespaces(tenant).contains(namespaceValue)) {
+            admin.namespaces().createNamespace(namespaceValue);
+            admin.namespaces().setRetention(namespaceValue, new 
RetentionPolicies(60, 1000));
         }
     }
 
@@ -135,6 +149,13 @@ public class PulsarMockProvider implements 
PulsarRuntimeProvider {
         configuration.setBrokerServicePort(Optional.of(findAvailablePort()));
         configuration.setWebServicePort(Optional.of(findAvailablePort()));
 
+        // Enable transaction with in memory.
+        configuration.setTransactionCoordinatorEnabled(true);
+        configuration.setTransactionMetadataStoreProviderClassName(
+                
"org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider");
+        configuration.setTransactionBufferProviderClassName(
+                
"org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider");
+
         return configuration;
     }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
index 070bd82..9630003 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
@@ -42,10 +42,7 @@ public class FlinkContainerTestEnvironment implements 
TestEnvironment, ClusterCo
     public FlinkContainerTestEnvironment(
             int numTaskManagers, int numSlotsPerTaskManager, String... 
jarPath) {
 
-        Configuration flinkConfiguration = new Configuration();
-        flinkConfiguration.set(HEARTBEAT_INTERVAL, 1000L);
-        flinkConfiguration.set(HEARTBEAT_TIMEOUT, 5000L);
-        flinkConfiguration.set(SLOT_REQUEST_TIMEOUT, 10000L);
+        Configuration flinkConfiguration = flinkConfiguration();
         flinkConfiguration.set(NUM_TASK_SLOTS, numSlotsPerTaskManager);
 
         this.flinkContainer =
@@ -113,4 +110,13 @@ public class FlinkContainerTestEnvironment implements 
TestEnvironment, ClusterCo
     public FlinkContainer getFlinkContainer() {
         return this.flinkContainer;
     }
+
+    protected Configuration flinkConfiguration() {
+        Configuration flinkConfiguration = new Configuration();
+        flinkConfiguration.set(HEARTBEAT_INTERVAL, 1000L);
+        flinkConfiguration.set(HEARTBEAT_TIMEOUT, 5000L);
+        flinkConfiguration.set(SLOT_REQUEST_TIMEOUT, 10000L);
+
+        return flinkConfiguration;
+    }
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
new file mode 100644
index 0000000..269f89c
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.15-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-end-to-end-tests-pulsar</artifactId>
+       <name>Flink : E2E Tests : Pulsar</name>
+
+       <properties>
+               <pulsar.version>2.8.0</pulsar.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-end-to-end-tests-common</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+               </dependency>
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>pulsar</artifactId>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-dependency-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>copy</id>
+                                               
<phase>pre-integration-test</phase>
+                                               <goals>
+                                                       <goal>copy</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <artifactItems>
+                                               <artifactItem>
+                                                       
<groupId>org.apache.flink</groupId>
+                                                       
<artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+                                                       
<version>${project.version}</version>
+                                                       
<destFileName>pulsar-connector.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </artifactItem>
+                                               <artifactItem>
+                                                       
<groupId>org.apache.pulsar</groupId>
+                                                       
<artifactId>pulsar-client-all</artifactId>
+                                                       
<version>${pulsar.version}</version>
+                                                       
<destFileName>pulsar-client-all.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </artifactItem>
+                                               <artifactItem>
+                                                       
<groupId>org.apache.pulsar</groupId>
+                                                       
<artifactId>pulsar-client-api</artifactId>
+                                                       
<version>${pulsar.version}</version>
+                                                       
<destFileName>pulsar-client-api.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </artifactItem>
+                                               <artifactItem>
+                                                       
<groupId>org.apache.pulsar</groupId>
+                                                       
<artifactId>pulsar-client-admin-api</artifactId>
+                                                       
<version>${pulsar.version}</version>
+                                                       
<destFileName>pulsar-admin-api.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </artifactItem>
+                                               <artifactItem>
+                                                       
<groupId>org.slf4j</groupId>
+                                                       
<artifactId>jul-to-slf4j</artifactId>
+                                                       
<version>${slf4j.version}</version>
+                                                       
<destFileName>jul-to-slf4j.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </artifactItem>
+                                       </artifactItems>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+</project>
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
similarity index 51%
copy from 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
copy to 
flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
index 89457dc..1427c2b 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -16,36 +16,40 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.source;
+package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import 
org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicConsumingContext;
-import 
org.apache.flink.connector.pulsar.testutils.cases.SingleTopicConsumingContext;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
-import 
org.apache.flink.connectors.test.common.environment.MiniClusterTestEnvironment;
 import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
 import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
 import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
 import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
+import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
+import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
 
-/** Unite test class for {@link PulsarSource}. */
-@SuppressWarnings("unused")
-class PulsarSourceITCase extends SourceTestSuiteBase<String> {
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
 
-    // Defines test environment on Flink MiniCluster
-    @TestEnv MiniClusterTestEnvironment flink = new 
MiniClusterTestEnvironment();
+/**
+ * Pulsar E2E test based on connector testing framework. It's used for 
Failover & Exclusive
+ * subscription.
+ */
+public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
+
+    // Defines TestEnvironment.
+    @TestEnv
+    FlinkContainerWithPulsarEnvironment flink = new 
FlinkContainerWithPulsarEnvironment(1, 6);
 
-    // Defines pulsar running environment
-    @ExternalSystem PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.MOCK);
+    // Defines ConnectorExternalSystem.
+    @ExternalSystem
+    PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(container(flink.getFlinkContainer()));
 
-    // Defines a external context Factories,
-    // so test cases will be invoked using this external contexts.
+    // Defines a set of external context Factories for different test cases.
     @ExternalContextFactory
-    PulsarTestContextFactory<String, SingleTopicConsumingContext> singleTopic =
-            new PulsarTestContextFactory<>(pulsar, 
SingleTopicConsumingContext::new);
+    PulsarTestContextFactory<String, ExclusiveSubscriptionContext> exclusive =
+            new PulsarTestContextFactory<>(pulsar, 
ExclusiveSubscriptionContext::new);
 
     @ExternalContextFactory
-    PulsarTestContextFactory<String, MultipleTopicConsumingContext> 
multipleTopic =
-            new PulsarTestContextFactory<>(pulsar, 
MultipleTopicConsumingContext::new);
+    PulsarTestContextFactory<String, FailoverSubscriptionContext> failover =
+            new PulsarTestContextFactory<>(pulsar, 
FailoverSubscriptionContext::new);
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
new file mode 100644
index 0000000..25cab21
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
+import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
+import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
+import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
+import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
+import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase;
+
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+
+/**
+ * Pulsar E2E test based on connector testing framework. It's used for Shared 
& Key_Shared
+ * subscription.
+ */
+public class PulsarSourceUnorderedE2ECase extends 
UnorderedSourceTestSuiteBase<String> {
+
+    // Defines TestEnvironment.
+    @TestEnv
+    FlinkContainerWithPulsarEnvironment flink = new 
FlinkContainerWithPulsarEnvironment(1, 8);
+
+    // Defines ConnectorExternalSystem.
+    @ExternalSystem
+    PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(container(flink.getFlinkContainer()));
+
+    // Defines a set of external context Factories for different test cases.
+    @ExternalContextFactory
+    PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
+            new PulsarTestContextFactory<>(pulsar, 
SharedSubscriptionContext::new);
+
+    @ExternalContextFactory
+    PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
+            new PulsarTestContextFactory<>(pulsar, 
KeySharedSubscriptionContext::new);
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
new file mode 100644
index 0000000..18b2ffc
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.cases;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import 
org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
+
+/** We would consuming from test splits by using {@link 
SubscriptionType#Exclusive} subscription. */
+public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext 
{
+    private static final long serialVersionUID = 6238209089442257487L;
+
+    public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment);
+    }
+
+    @Override
+    protected String displayName() {
+        return "consuming message by Exclusive";
+    }
+
+    @Override
+    protected String subscriptionName() {
+        return "pulsar-exclusive-subscription";
+    }
+
+    @Override
+    protected SubscriptionType subscriptionType() {
+        return SubscriptionType.Exclusive;
+    }
+
+    @Override
+    protected String serviceUrl() {
+        return PULSAR_SERVICE_URL;
+    }
+
+    @Override
+    protected String adminUrl() {
+        return PULSAR_ADMIN_URL;
+    }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
new file mode 100644
index 0000000..c322efa
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.cases;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import 
org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
+
+/** We would consuming from test splits by using {@link 
SubscriptionType#Failover} subscription. */
+public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
+    private static final long serialVersionUID = 6238209089442257487L;
+
+    public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment);
+    }
+
+    @Override
+    protected String displayName() {
+        return "consuming message by Failover";
+    }
+
+    @Override
+    protected String subscriptionName() {
+        return "pulsar-failover-subscription";
+    }
+
+    @Override
+    protected SubscriptionType subscriptionType() {
+        return SubscriptionType.Failover;
+    }
+
+    @Override
+    protected String serviceUrl() {
+        return PULSAR_SERVICE_URL;
+    }
+
+    @Override
+    protected String adminUrl() {
+        return PULSAR_ADMIN_URL;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
similarity index 50%
copy from 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
copy to 
flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
index 7ce676c..e442418 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.testutils.cases;
+package org.apache.flink.tests.util.pulsar.cases;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
@@ -24,57 +24,77 @@ import 
org.apache.flink.connector.pulsar.source.PulsarSource;
 import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import 
org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
 
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.List;
 
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
 import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
 import static org.apache.pulsar.client.api.Schema.STRING;
-import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
 
 /**
- * Pulsar external context that will create multiple topics with only one 
partitions as source
- * splits.
+ * We would consuming from test splits by using {@link 
SubscriptionType#Key_Shared} subscription.
  */
-public class MultipleTopicConsumingContext extends PulsarTestContext<String> {
+public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
+    private static final long serialVersionUID = 3246516520107893983L;
 
-    private int numTopics = 0;
+    private int index = 0;
 
-    private final String topicPattern;
+    private final List<KeyedPulsarPartitionDataWriter> writers = new 
ArrayList<>();
 
-    private final Map<String, SourceSplitDataWriter<String>> 
topicNameToSplitWriters =
-            new HashMap<>();
+    // Message keys.
+    private final String key1;
+    private final String key2;
 
-    public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
-        super("consuming message on multiple topic", environment);
-        this.topicPattern =
-                "pulsar-multiple-topic-[0-9]+-"
-                        + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment);
+
+        // Init message keys.
+        this.key1 = randomAlphabetic(8);
+        String newKey2;
+        do {
+            newKey2 = randomAlphabetic(8);
+        } while (keyHash(key1) == keyHash(newKey2));
+        this.key2 = newKey2;
+    }
+
+    @Override
+    protected String displayName() {
+        return "consuming message by Key_Shared";
     }
 
     @Override
     public Source<String, ?, ?> createSource(Boundedness boundedness) {
+        int keyHash = keyHash(key1);
+        TopicRange range = new TopicRange(keyHash, keyHash);
+
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
                         .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
-                        .setTopicPattern(topicPattern, 
RegexSubscriptionMode.AllTopics)
-                        .setSubscriptionType(Exclusive)
-                        
.setSubscriptionName("flink-pulsar-multiple-topic-test");
+                        .setServiceUrl(PULSAR_SERVICE_URL)
+                        .setAdminUrl(PULSAR_ADMIN_URL)
+                        .setTopicPattern(
+                                "pulsar-[0-9]+-key-shared", 
RegexSubscriptionMode.AllTopics)
+                        .setSubscriptionType(SubscriptionType.Key_Shared)
+                        .setSubscriptionName("pulsar-key-shared")
+                        .setRangeGenerator(new 
FixedRangeGenerator(singletonList(range)));
         if (boundedness == Boundedness.BOUNDED) {
             // Using latest stop cursor for making sure the source could be 
stopped.
-            // This is required for SourceTestSuiteBase.
             builder.setBoundedStopCursor(StopCursor.latest());
         }
 
@@ -83,16 +103,14 @@ public class MultipleTopicConsumingContext extends 
PulsarTestContext<String> {
 
     @Override
     public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
-        String topicName = topicPattern.replace("[0-9]+", 
String.valueOf(numTopics));
+        String topicName = "pulsar-" + index + "-key-shared";
         operator.createTopic(topicName, 1);
+        index++;
 
         String partitionName = 
TopicNameUtils.topicNameWithPartition(topicName, 0);
-        TopicPartition partition = new TopicPartition(partitionName, 0, 
createFullRange());
-        PulsarPartitionDataWriter writer =
-                new PulsarPartitionDataWriter(operator.client(), partition);
-
-        topicNameToSplitWriters.put(partitionName, writer);
-        numTopics++;
+        KeyedPulsarPartitionDataWriter writer =
+                new KeyedPulsarPartitionDataWriter(operator, partitionName, 
key1, key2);
+        writers.add(writer);
 
         return writer;
     }
@@ -103,11 +121,14 @@ public class MultipleTopicConsumingContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public void close() throws Exception {
-        for (SourceSplitDataWriter<String> writer : 
topicNameToSplitWriters.values()) {
+    public void close() {
+        for (KeyedPulsarPartitionDataWriter writer : writers) {
             writer.close();
         }
+        writers.clear();
+    }
 
-        topicNameToSplitWriters.clear();
+    private int keyHash(String key) {
+        return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % 
RANGE_SIZE;
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
similarity index 57%
copy from 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
copy to 
flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
index 7ce676c..f936b6f 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.testutils.cases;
+package org.apache.flink.tests.util.pulsar.cases;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
@@ -24,42 +24,38 @@ import 
org.apache.flink.connector.pulsar.source.PulsarSource;
 import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
 
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.List;
 
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
 import static org.apache.pulsar.client.api.Schema.STRING;
-import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
 
-/**
- * Pulsar external context that will create multiple topics with only one 
partitions as source
- * splits.
- */
-public class MultipleTopicConsumingContext extends PulsarTestContext<String> {
+/** We would consuming from test splits by using {@link 
SubscriptionType#Shared} subscription. */
+public class SharedSubscriptionContext extends PulsarTestContext<String> {
+    private static final long serialVersionUID = -2798707923661295245L;
 
-    private int numTopics = 0;
+    private int index = 0;
 
-    private final String topicPattern;
+    private final List<PulsarPartitionDataWriter> writers = new ArrayList<>();
 
-    private final Map<String, SourceSplitDataWriter<String>> 
topicNameToSplitWriters =
-            new HashMap<>();
+    public SharedSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment);
+    }
 
-    public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
-        super("consuming message on multiple topic", environment);
-        this.topicPattern =
-                "pulsar-multiple-topic-[0-9]+-"
-                        + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    @Override
+    protected String displayName() {
+        return "consuming message by Shared";
     }
 
     @Override
@@ -67,14 +63,13 @@ public class MultipleTopicConsumingContext extends 
PulsarTestContext<String> {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
                         .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
-                        .setTopicPattern(topicPattern, 
RegexSubscriptionMode.AllTopics)
-                        .setSubscriptionType(Exclusive)
-                        
.setSubscriptionName("flink-pulsar-multiple-topic-test");
+                        .setServiceUrl(PULSAR_SERVICE_URL)
+                        .setAdminUrl(PULSAR_ADMIN_URL)
+                        .setTopicPattern("pulsar-[0-9]+-shared", 
RegexSubscriptionMode.AllTopics)
+                        .setSubscriptionType(SubscriptionType.Shared)
+                        .setSubscriptionName("pulsar-shared");
         if (boundedness == Boundedness.BOUNDED) {
             // Using latest stop cursor for making sure the source could be 
stopped.
-            // This is required for SourceTestSuiteBase.
             builder.setBoundedStopCursor(StopCursor.latest());
         }
 
@@ -83,16 +78,13 @@ public class MultipleTopicConsumingContext extends 
PulsarTestContext<String> {
 
     @Override
     public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
-        String topicName = topicPattern.replace("[0-9]+", 
String.valueOf(numTopics));
+        String topicName = "pulsar-" + index + "-shared";
         operator.createTopic(topicName, 1);
+        index++;
 
         String partitionName = 
TopicNameUtils.topicNameWithPartition(topicName, 0);
-        TopicPartition partition = new TopicPartition(partitionName, 0, 
createFullRange());
-        PulsarPartitionDataWriter writer =
-                new PulsarPartitionDataWriter(operator.client(), partition);
-
-        topicNameToSplitWriters.put(partitionName, writer);
-        numTopics++;
+        PulsarPartitionDataWriter writer = new 
PulsarPartitionDataWriter(operator, partitionName);
+        writers.add(writer);
 
         return writer;
     }
@@ -103,11 +95,10 @@ public class MultipleTopicConsumingContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public void close() throws Exception {
-        for (SourceSplitDataWriter<String> writer : 
topicNameToSplitWriters.values()) {
+    public void close() {
+        for (PulsarPartitionDataWriter writer : writers) {
             writer.close();
         }
-
-        topicNameToSplitWriters.clear();
+        writers.clear();
     }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
new file mode 100644
index 0000000..890d09e
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+
+import static 
org.apache.flink.configuration.TaskManagerOptions.TASK_OFF_HEAP_MEMORY;
+
+/** A Flink Container which would bundles pulsar connector in its classpath. */
+public class FlinkContainerWithPulsarEnvironment extends 
FlinkContainerTestEnvironment {
+
+    public FlinkContainerWithPulsarEnvironment(int numTaskManagers, int 
numSlotsPerTaskManager) {
+        super(
+                numTaskManagers,
+                numSlotsPerTaskManager,
+                resourcePath("pulsar-connector.jar"),
+                resourcePath("pulsar-client-all.jar"),
+                resourcePath("pulsar-client-api.jar"),
+                resourcePath("pulsar-admin-api.jar"),
+                resourcePath("jul-to-slf4j.jar"));
+    }
+
+    private static String resourcePath(String jarName) {
+        return TestUtils.getResource(jarName).toAbsolutePath().toString();
+    }
+
+    @Override
+    protected Configuration flinkConfiguration() {
+        Configuration configuration = super.flinkConfiguration();
+        // Increase the off heap memory for avoiding direct buffer memory 
error on Pulsar e2e tests.
+        configuration.set(TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(100));
+
+        return configuration;
+    }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
new file mode 100644
index 0000000..eea97e6
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Source split data writer for writing test data into a Pulsar topic 
partition. It will write the
+ * message with two keys.
+ */
+public class KeyedPulsarPartitionDataWriter implements 
SourceSplitDataWriter<String> {
+
+    private final PulsarRuntimeOperator operator;
+    private final String fullTopicName;
+    private final String key1;
+    private final String key2;
+
+    public KeyedPulsarPartitionDataWriter(
+            PulsarRuntimeOperator operator, String fullTopicName, String key1, 
String key2) {
+        this.operator = operator;
+        this.fullTopicName = fullTopicName;
+        this.key1 = key1;
+        this.key2 = key2;
+    }
+
+    @Override
+    public void writeRecords(Collection<String> records) {
+        operator.sendMessages(fullTopicName, Schema.STRING, key1, records);
+
+        List<String> newRecords = records.stream().map(a -> a + 
key1).collect(toList());
+        operator.sendMessages(fullTopicName, Schema.STRING, key2, newRecords);
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
new file mode 100644
index 0000000..c452fe6
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import 
org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestCaseInvocationContextProvider;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/** A source test template for testing the messages which could be consumed in 
a unordered way. */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class UnorderedSourceTestSuiteBase<T> {
+
+    @TestTemplate
+    @DisplayName("Test source with one split and four consumers")
+    public void testOneSplitWithMultipleConsumers(
+            TestEnvironment testEnv, ExternalContext<T> externalContext) 
throws Exception {
+        Collection<T> testData =
+                externalContext.generateTestData(0, 
ThreadLocalRandom.current().nextLong());
+        SourceSplitDataWriter<T> writer = 
externalContext.createSourceSplitDataWriter();
+        writer.writeRecords(testData);
+
+        Source<T, ?, ?> source = 
externalContext.createSource(Boundedness.BOUNDED);
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        List<T> results =
+                execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), 
"Pulsar source")
+                        .setParallelism(4)
+                        .executeAndCollect(
+                                "Source single split with four readers.", 
testData.size());
+
+        assertThat(results, containsInAnyOrder(testData.toArray()));
+    }
+}
diff --git 
a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties
similarity index 58%
copy from 
tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
copy to 
flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties
index 521d055..835c2ec 100644
--- 
a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties
@@ -16,26 +16,13 @@
 # limitations under the License.
 
################################################################################
 
-# These modules are not deployed to maven central, despite their use of the 
shade plugin.
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
 
-flink-examples-streaming-twitter
-flink-examples-streaming-gcp-pubsub
-flink-yarn-tests
-flink-docs
-flink-datastream-allround-test
-flink-queryable-state-test
-flink-confluent-schema-registry
-flink-stream-stateful-job-upgrade-test
-flink-elasticsearch7-test
-flink-stream-state-ttl-test
-flink-state-evolution-test
-flink-elasticsearch6-test
-flink-rocksdb-state-memory-control-test
-flink-python-test
-flink-streaming-kinesis-test
-flink-tpch-test
-flink-streaming-kafka-test-base
-flink-heavy-deployment-stress-test
-flink-elasticsearch5-test
-flink-high-parallelism-iterations-test
-flink-end-to-end-tests-common-kafka
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 8aada21..84ca52e 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -88,6 +88,7 @@ under the License.
                <module>flink-python-test</module>
                <module>flink-end-to-end-tests-hbase</module>
                <module>flink-glue-schema-registry-test</module>
+               <module>flink-end-to-end-tests-pulsar</module>
        </modules>
 
        <dependencyManagement>
diff --git 
a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
 
b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
index 521d055..110ba08 100644
--- 
a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
+++ 
b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
@@ -39,3 +39,4 @@ flink-heavy-deployment-stress-test
 flink-elasticsearch5-test
 flink-high-parallelism-iterations-test
 flink-end-to-end-tests-common-kafka
+flink-end-to-end-tests-pulsar

Reply via email to