This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6be14da65fedf01e82dc83a58e791709ce8ce57
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Feb 9 15:09:50 2022 +0800

    [FLINK-26025][connector/pulsar] Replace MockPulsar with new Pulsar test 
tools based on PulsarStandalone.
    
    1. Drop some unused fields in test classes.
    2. Fix the checkstyle issues for source test.
    3. Fix violations for Pulsar connector according to the 
flink-architecture-tests.
    4. Create a standalone Pulsar for test.
    5. Add new methods to PulsarRuntimeOperator.
    6. Fix the bug in PulsarContainerRuntime, support running tests in E2E 
environment.
    7. Create PulsarContainerTestEnvironment for supporting E2E tests.
    8. Add a lot of comments for Pulsar testing tools.
    9. Drop mocked Pulsar service, use standalone Pulsar instead.
---
 flink-connectors/flink-connector-pulsar/pom.xml    |  16 -
 .../source/enumerator/cursor/StopCursor.java       |   2 -
 .../source/enumerator/topic/TopicPartition.java    |   4 +-
 .../split/PulsarUnorderedPartitionSplitReader.java |  16 +-
 .../common/schema/PulsarSchemaUtilsTest.java       |   6 +-
 .../pulsar/source/PulsarSourceITCase.java          |   2 +-
 .../subscriber/PulsarSubscriberTest.java           |  10 +-
 .../reader/source/PulsarSourceReaderTestBase.java  |   2 +-
 .../pulsar/testutils/PulsarTestContext.java        |   4 -
 .../pulsar/testutils/PulsarTestSuiteBase.java      |   2 +-
 .../connector/pulsar/testutils/SampleData.java     |  96 -----
 .../cases/MultipleTopicConsumingContext.java       |   1 -
 .../cases/MultipleTopicTemplateContext.java        |   1 -
 .../cases/SingleTopicConsumingContext.java         |   1 -
 .../pulsar/testutils/runtime/PulsarRuntime.java    |  36 +-
 .../testutils/runtime/PulsarRuntimeOperator.java   | 414 ++++++++++++++++++---
 .../runtime/container/PulsarContainerRuntime.java  |  61 ++-
 .../runtime/embedded/PulsarEmbeddedRuntime.java    | 284 ++++++++++++++
 .../runtime/mock/BlankBrokerInterceptor.java       |  61 ---
 .../runtime/mock/MockBookKeeperClientFactory.java  |  74 ----
 .../testutils/runtime/mock/MockPulsarService.java  |  87 -----
 .../runtime/mock/MockZooKeeperClientFactory.java   |  73 ----
 .../runtime/mock/NonClosableMockBookKeeper.java    |  55 ---
 .../testutils/runtime/mock/PulsarMockRuntime.java  | 160 --------
 .../mock/SameThreadOrderedSafeExecutor.java        |  56 ---
 .../test/resources/containers/txnStandalone.conf   | 100 ++++-
 .../util/flink/container/FlinkContainers.java      |   2 +-
 .../util/pulsar/PulsarSourceOrderedE2ECase.java    |   7 +-
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |   7 +-
 .../pulsar/cases/ExclusiveSubscriptionContext.java |  14 -
 .../pulsar/cases/FailoverSubscriptionContext.java  |  14 -
 .../pulsar/cases/KeySharedSubscriptionContext.java |   7 +-
 .../pulsar/cases/SharedSubscriptionContext.java    |   7 +-
 .../common/PulsarContainerTestEnvironment.java     |  31 ++
 34 files changed, 867 insertions(+), 846 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/pom.xml 
b/flink-connectors/flink-connector-pulsar/pom.xml
index 45047eb..fc7b68c 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -120,22 +120,6 @@ under the License.
                <!-- we don't override the version here. -->
                <dependency>
                        <groupId>org.apache.pulsar</groupId>
-                       <artifactId>testmocks</artifactId>
-                       <version>${pulsar.version}</version>
-                       <scope>test</scope>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.testng</groupId>
-                                       <artifactId>testng</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.powermock</groupId>
-                                       
<artifactId>powermock-module-testng</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.pulsar</groupId>
                        <artifactId>pulsar-broker</artifactId>
                        <version>${pulsar.version}</version>
                        <scope>test</scope>
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
index b85944f..aaec143 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.cursor;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.LatestMessageStopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.MessageIdStopCursor;
@@ -42,7 +41,6 @@ import java.io.Serializable;
 public interface StopCursor extends Serializable {
 
     /** The open method for the cursor initializer. This method could be 
executed multiple times. */
-    @Internal
     default void open(PulsarAdmin admin, TopicPartition partition) {}
 
     /**
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
index f29d88d..b3035cd 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.topic;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 
 import org.apache.pulsar.client.api.Range;
@@ -35,7 +35,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Topic partition is the basic topic information used by {@link SplitReader}, 
we create this topic
  * metas for a specified topic by subscription type and convert it into a 
partition split.
  */
-@Internal
+@PublicEvolving
 public class TopicPartition implements Serializable {
     private static final long serialVersionUID = -1474354741550810953L;
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 2027df4..5940cc9 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -40,11 +40,11 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction;
 
 /**
  * The split reader a given {@link PulsarPartitionSplit}, it would be closed 
once the {@link
@@ -162,18 +162,6 @@ public class PulsarUnorderedPartitionSplitReader<OUT> 
extends PulsarPartitionSpl
 
     private Transaction newTransaction() {
         long timeoutMillis = sourceConfiguration.getTransactionTimeoutMillis();
-        CompletableFuture<Transaction> future =
-                sneakyClient(pulsarClient::newTransaction)
-                        .withTransactionTimeout(timeoutMillis, 
TimeUnit.MILLISECONDS)
-                        .build();
-
-        try {
-            return future.get();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException(e);
-        }
+        return createTransaction(pulsarClient, timeoutMillis);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtilsTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtilsTest.java
index 1aa4404..d4bd1fc 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtilsTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtilsTest.java
@@ -57,14 +57,12 @@ class PulsarSchemaUtilsTest {
     }
 
     @Test
-    @SuppressWarnings("java:S5778")
     void createSchemaForComplexSchema() {
         // Avro
         Schema<Foo> avro1 = Schema.AVRO(Foo.class);
         PulsarSchema<Foo> avro2 = new PulsarSchema<>(avro1, Foo.class);
-        assertThrows(
-                NullPointerException.class,
-                () -> PulsarSchemaUtils.createSchema(avro1.getSchemaInfo()));
+        SchemaInfo info1 = avro1.getSchemaInfo();
+        assertThrows(NullPointerException.class, () -> 
PulsarSchemaUtils.createSchema(info1));
 
         Schema<Foo> schema = 
PulsarSchemaUtils.createSchema(avro2.getSchemaInfo());
         assertNotEquals(schema.getSchemaInfo(), avro1.getSchemaInfo());
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index b28e449..94c5c83 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -40,7 +40,7 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
 
     // Defines pulsar running environment
     @TestExternalSystem
-    PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.mock());
+    PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.embedded());
 
     @TestSemantics
     CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
index bdfbb42..8409f63 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
@@ -64,8 +64,8 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
 
         assertEquals(expectedPartitions, topicPartitions);
 
-        operator().deleteTopic(TOPIC1, true);
-        operator().deleteTopic(TOPIC2, true);
+        operator().deleteTopic(TOPIC1);
+        operator().deleteTopic(TOPIC2);
     }
 
     @Test
@@ -91,8 +91,8 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
 
         assertEquals(expectedPartitions, topicPartitions);
 
-        operator().deleteTopic(TOPIC1, true);
-        operator().deleteTopic(TOPIC2, true);
-        operator().deleteTopic(TOPIC3, true);
+        operator().deleteTopic(TOPIC1);
+        operator().deleteTopic(TOPIC2);
+        operator().deleteTopic(TOPIC3);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
index f7cb120..a42741d 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
@@ -94,7 +94,7 @@ abstract class PulsarSourceReaderTestBase extends 
PulsarTestSuiteBase {
 
     @AfterEach
     void afterEach(String topicName) {
-        operator().deleteTopic(topicName, true);
+        operator().deleteTopic(topicName);
     }
 
     @TestTemplate
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index c6af529..f238a03 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -27,10 +27,6 @@ import java.util.List;
 
 /** Common test context for pulsar based test. */
 public abstract class PulsarTestContext<T> implements 
DataStreamSourceExternalContext<T> {
-    private static final long serialVersionUID = 1L;
-
-    private static final int NUM_RECORDS_UPPER_BOUND = 500;
-    private static final int NUM_RECORDS_LOWER_BOUND = 100;
 
     protected final PulsarRuntimeOperator operator;
     protected final List<URL> connectorJarPaths;
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
index b55fdc5..c87140b 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
@@ -56,7 +56,7 @@ public abstract class PulsarTestSuiteBase {
      * pulsar broker. Override this method when needs.
      */
     protected PulsarRuntime runtime() {
-        return PulsarRuntime.mock();
+        return PulsarRuntime.embedded();
     }
 
     /** Operate pulsar by acquiring a runtime operator. */
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/SampleData.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/SampleData.java
index ec54761..8fec4ff 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/SampleData.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/SampleData.java
@@ -18,117 +18,21 @@
 
 package org.apache.flink.connector.pulsar.testutils;
 
-import java.nio.charset.StandardCharsets;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Supplier;
-
-import static java.util.function.Function.identity;
-import static java.util.stream.Stream.generate;
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
-import static org.apache.commons.lang3.RandomStringUtils.randomAscii;
-import static 
org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList.toImmutableList;
-import static 
org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap.toImmutableMap;
 
 /** Sample data for various test cases. */
 public class SampleData {
 
-    private static final Random RAND = new Random(System.currentTimeMillis());
-    private static final Supplier<Integer> LIST_SIZE = () -> RAND.nextInt(10) 
+ 5;
-    private static final long MIN_DAY = LocalDate.of(2000, 1, 1).toEpochDay();
-    private static final long MAX_DAY = LocalDate.of(2040, 12, 
31).toEpochDay();
-    private static final Supplier<Bar> BAR_SUPPLIER =
-            () -> new Bar(RAND.nextBoolean(), randomAlphanumeric(10));
-    private static final Supplier<List<Bar>> BAR_LIST_SUPPLIER =
-            () -> 
generate(BAR_SUPPLIER).limit(LIST_SIZE.get()).collect(toImmutableList());
-    private static final Supplier<Map<String, Bar>> BAR_MAP_SUPPLIER =
-            () ->
-                    generate(BAR_SUPPLIER)
-                            .limit(LIST_SIZE.get())
-                            .collect(toImmutableMap(Bar::toString, 
identity()));
-
     // --------------------------------//
     //                                 //
     // Random sample data for tests.   //
     //                                 //
     // --------------------------------//
 
-    public static final List<Boolean> BOOLEAN_LIST =
-            
generate(RAND::nextBoolean).limit(LIST_SIZE.get()).collect(toImmutableList());
-
-    public static final List<Integer> INTEGER_LIST =
-            
generate(RAND::nextInt).limit(LIST_SIZE.get()).collect(toImmutableList());
-
-    public static final List<byte[]> BYTES_LIST =
-            generate(() -> randomAscii(8))
-                    .limit(LIST_SIZE.get())
-                    .map(s -> s.getBytes(StandardCharsets.UTF_8))
-                    .collect(toImmutableList());
-
-    public static final List<Byte> INT_8_LIST =
-            generate(RAND::nextInt)
-                    .limit(LIST_SIZE.get())
-                    .map(Integer::byteValue)
-                    .collect(toImmutableList());
-
-    public static final List<Short> INT_16_LIST =
-            generate(RAND::nextInt)
-                    .limit(LIST_SIZE.get())
-                    .map(Integer::shortValue)
-                    .collect(toImmutableList());
-
-    public static final List<Long> INT_64_LIST =
-            
generate(RAND::nextLong).limit(LIST_SIZE.get()).collect(toImmutableList());
-
-    public static final List<Double> DOUBLE_LIST =
-            
generate(RAND::nextDouble).limit(LIST_SIZE.get()).collect(toImmutableList());
-
-    public static final List<Float> FLOAT_LIST =
-            
generate(RAND::nextFloat).limit(LIST_SIZE.get()).collect(toImmutableList());
-
-    public static final List<String> STRING_LIST =
-            generate(() -> 
randomAlphanumeric(8)).limit(LIST_SIZE.get()).collect(toImmutableList());
-
-    public static final List<LocalDate> LOCAL_DATE_LIST =
-            generate(() -> ThreadLocalRandom.current().nextLong(MIN_DAY, 
MAX_DAY))
-                    .limit(LIST_SIZE.get())
-                    .map(LocalDate::ofEpochDay)
-                    .collect(toImmutableList());
-
-    public static final List<LocalDateTime> LOCAL_DATE_TIME_LIST =
-            generate(() -> ThreadLocalRandom.current().nextLong(MIN_DAY, 
MAX_DAY))
-                    .limit(LIST_SIZE.get())
-                    .map(LocalDate::ofEpochDay)
-                    .map(LocalDate::atStartOfDay)
-                    .collect(toImmutableList());
-
-    public static final List<FA> FA_LIST =
-            generate(() -> new FA(BAR_LIST_SUPPLIER.get().toArray(new Bar[0])))
-                    .limit(LIST_SIZE.get())
-                    .collect(toImmutableList());
-
-    public static final List<Foo> FOO_LIST =
-            generate(() -> new Foo(RAND.nextInt(), RAND.nextFloat(), 
BAR_SUPPLIER.get()))
-                    .limit(LIST_SIZE.get())
-                    .collect(toImmutableList());
-
-    public static final List<FL> FL_LIST =
-            generate(() -> new FL(BAR_LIST_SUPPLIER.get()))
-                    .limit(LIST_SIZE.get())
-                    .collect(toImmutableList());
-
-    public static final List<FM> FM_LIST =
-            generate(() -> new FM(BAR_MAP_SUPPLIER.get()))
-                    .limit(LIST_SIZE.get())
-                    .collect(toImmutableList());
-
     /** Foo type. */
     public static class Foo {
         public int i;
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
index 35c2b58..57027f3 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
@@ -31,7 +31,6 @@ import java.util.List;
  * splits.
  */
 public class MultipleTopicConsumingContext extends 
MultipleTopicTemplateContext {
-    private static final long serialVersionUID = 1L;
 
     public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
         this(environment, Collections.emptyList());
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
index 8c77cb5..3eca9e7 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
@@ -49,7 +49,6 @@ import static org.apache.pulsar.client.api.Schema.STRING;
  * source splits.
  */
 public abstract class MultipleTopicTemplateContext extends 
PulsarTestContext<String> {
-    private static final long serialVersionUID = 1L;
 
     private int numTopics = 0;
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
index ab3db06..f5bfa45 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
@@ -47,7 +47,6 @@ import static 
org.apache.pulsar.client.api.SubscriptionType.Exclusive;
  * source splits.
  */
 public class SingleTopicConsumingContext extends PulsarTestContext<String> {
-    private static final long serialVersionUID = 1L;
 
     private static final String TOPIC_NAME_PREFIX = "pulsar-single-topic";
     private final String topicName;
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
index d46658e..9c1cd01 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
@@ -20,13 +20,15 @@ package org.apache.flink.connector.pulsar.testutils.runtime;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime;
-import 
org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockRuntime;
+import 
org.apache.flink.connector.pulsar.testutils.runtime.embedded.PulsarEmbeddedRuntime;
 
 import org.testcontainers.containers.GenericContainer;
 
 /**
  * An abstraction for different pulsar runtimes. Providing the common methods 
for {@link
  * PulsarTestEnvironment}.
+ *
+ * <p>All the Pulsar runtime should enable the transaction by default.
  */
 public interface PulsarRuntime {
 
@@ -36,17 +38,43 @@ public interface PulsarRuntime {
     /** Shutdown this pulsar runtime. */
     void tearDown();
 
-    /** Return a operator for operating this pulsar runtime. */
+    /**
+     * Return a operator for operating this pulsar runtime. This operator 
predefined a set of
+     * extremely useful methods for Pulsar. You can easily add new methods in 
this operator.
+     */
     PulsarRuntimeOperator operator();
 
-    static PulsarRuntime mock() {
-        return new PulsarMockRuntime();
+    /**
+     * Create a standalone Pulsar instance in test thread. We would start a 
embedded zookeeper and
+     * bookkeeper. The stream storage for bookkeeper is disabled. The function 
worker is disabled on
+     * Pulsar broker.
+     *
+     * <p>This runtime would be faster than {@link #container()} and behaves 
the same like the
+     * {@link #container()}.
+     */
+    static PulsarRuntime embedded() {
+        return new PulsarEmbeddedRuntime();
     }
 
+    /**
+     * Create a Pulsar instance in docker. We would start a standalone Pulsar 
in TestContainers.
+     * This runtime is often used in end-to-end tests. The performance may be 
a bit of slower than
+     * {@link #embedded()}. The stream storage for bookkeeper is disabled. The 
function worker is
+     * disabled on Pulsar broker.
+     */
     static PulsarRuntime container() {
         return new PulsarContainerRuntime();
     }
 
+    /**
+     * Create a Pulsar instance in docker. We would start a standalone Pulsar 
in TestContainers.
+     * This runtime is often used in end-to-end tests. The performance may be 
a bit of slower than
+     * {@link #embedded()}. The stream storage for bookkeeper is disabled. The 
function worker is
+     * disabled on Pulsar broker.
+     *
+     * <p>We would link the created Pulsar docker instance with the given 
flink instance. This would
+     * enable the connection for Pulsar and Flink in docker environment.
+     */
     static PulsarRuntime container(GenericContainer<?> flinkContainer) {
         return new 
PulsarContainerRuntime().bindWithFlinkContainer(flinkContainer);
     }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index e53f7aa..a78ea99 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -19,7 +19,8 @@
 package org.apache.flink.connector.pulsar.testutils.runtime;
 
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.testframe.external.ExternalContext;
@@ -28,79 +29,139 @@ import 
org.apache.flink.shaded.guava30.com.google.common.base.Strings;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyThrow;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
 
 /**
- * A pulsar cluster operator is used for operating pulsar instance. It's 
serializable for using in
+ * A pulsar cluster operator used for operating pulsar instance. It's 
serializable for using in
  * {@link ExternalContext}.
  */
-public class PulsarRuntimeOperator implements Serializable, Closeable {
-    private static final long serialVersionUID = -630646912412751301L;
+public class PulsarRuntimeOperator implements Closeable {
 
     public static final int DEFAULT_PARTITIONS = 10;
     public static final int NUM_RECORDS_PER_PARTITION = 20;
+    public static final String SUBSCRIPTION_NAME = "PulsarRuntimeOperator";
 
-    private String serviceUrl;
-    private String adminUrl;
-
-    private transient PulsarClient client;
-    private transient PulsarAdmin admin;
+    private final String serviceUrl;
+    private final String adminUrl;
+    private final PulsarClient client;
+    private final PulsarAdmin admin;
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, 
Producer<?>>> producers;
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, 
Consumer<?>>> consumers;
 
     public PulsarRuntimeOperator(String serviceUrl, String adminUrl) {
-        this.serviceUrl = serviceUrl;
-        this.adminUrl = adminUrl;
-        initializeClients();
+        this(serviceUrl, serviceUrl, adminUrl, adminUrl);
+    }
+
+    public PulsarRuntimeOperator(
+            String serviceUrl,
+            String containerServiceUrl,
+            String adminUrl,
+            String containerAdminUrl) {
+        this.serviceUrl = containerServiceUrl;
+        this.adminUrl = containerAdminUrl;
+        this.client =
+                sneakyClient(
+                        () ->
+                                PulsarClient.builder()
+                                        .serviceUrl(serviceUrl)
+                                        .enableTransaction(true)
+                                        .build());
+        this.admin = sneakyClient(() -> 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build());
+        this.producers = new ConcurrentHashMap<>();
+        this.consumers = new ConcurrentHashMap<>();
     }
 
     /**
      * Create a topic with default {@link #DEFAULT_PARTITIONS} partitions and 
send a fixed number
      * {@link #NUM_RECORDS_PER_PARTITION} of records to this topic.
+     *
+     * @param topic Pulsar topic name, it couldn't be a name with partition 
index.
      */
     public void setupTopic(String topic) {
         Random random = new Random(System.currentTimeMillis());
         setupTopic(topic, Schema.STRING, () -> randomAlphanumeric(10 + 
random.nextInt(20)));
     }
 
+    /**
+     * Create a topic with default {@link #DEFAULT_PARTITIONS} partitions and 
send a fixed number
+     * {@link #NUM_RECORDS_PER_PARTITION} of records to this topic.
+     *
+     * @param topic Pulsar topic name, it couldn't be a name with partition 
index.
+     * @param schema The Pulsar schema for serializing records into bytes.
+     * @param supplier The supplier for providing the records which would be 
sent to Pulsar.
+     */
     public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> 
supplier) {
         setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION);
     }
 
+    /**
+     * Create a topic with default {@link #DEFAULT_PARTITIONS} partitions and 
send a fixed number of
+     * records to this topic.
+     *
+     * @param topic Pulsar topic name, it couldn't be a name with partition 
index.
+     * @param schema The Pulsar schema for serializing records into bytes.
+     * @param supplier The supplier for providing the records which would be 
sent to Pulsar.
+     * @param numRecordsPerSplit The number of records for a partition.
+     */
     public <T> void setupTopic(
             String topic, Schema<T> schema, Supplier<T> supplier, int 
numRecordsPerSplit) {
-        createTopic(topic, DEFAULT_PARTITIONS);
+        String topicName = topicName(topic);
+        createTopic(topicName, DEFAULT_PARTITIONS);
 
         // Make sure every topic partition has messages.
         for (int i = 0; i < DEFAULT_PARTITIONS; i++) {
-            String partitionName = 
TopicNameUtils.topicNameWithPartition(topic, i);
+            String partitionName = topicNameWithPartition(topic, i);
             List<T> messages =
                     
Stream.generate(supplier).limit(numRecordsPerSplit).collect(toList());
 
@@ -108,27 +169,67 @@ public class PulsarRuntimeOperator implements 
Serializable, Closeable {
         }
     }
 
+    /**
+     * Create a pulsar topic with given partition number.
+     *
+     * @param topic The name of the topic.
+     * @param numberOfPartitions The number of partitions. We would create a 
non-partitioned topic
+     *     if this number if zero.
+     */
     public void createTopic(String topic, int numberOfPartitions) {
         checkArgument(numberOfPartitions >= 0);
-        if (numberOfPartitions == 0) {
+        if (numberOfPartitions <= 0) {
             createNonPartitionedTopic(topic);
         } else {
             createPartitionedTopic(topic, numberOfPartitions);
         }
     }
 
+    /**
+     * Increase the partition number of the topic.
+     *
+     * @param topic The topic name.
+     * @param newPartitionsNum The new partition size which should exceed 
previous size.
+     */
     public void increaseTopicPartitions(String topic, int newPartitionsNum) {
+        PartitionedTopicMetadata metadata =
+                sneakyAdmin(() -> 
admin().topics().getPartitionedTopicMetadata(topic));
+        checkArgument(
+                metadata.partitions < newPartitionsNum,
+                "The new partition size which should exceed previous size.");
+
         sneakyAdmin(() -> admin().topics().updatePartitionedTopic(topic, 
newPartitionsNum));
     }
 
-    public void deleteTopic(String topic, boolean isPartitioned) {
-        if (isPartitioned) {
-            sneakyAdmin(() -> admin().topics().deletePartitionedTopic(topic));
+    /**
+     * Delete a Pulsar topic.
+     *
+     * @param topic The topic name.
+     */
+    public void deleteTopic(String topic) {
+        String topicName = topicName(topic);
+        PartitionedTopicMetadata metadata;
+
+        try {
+            metadata = admin().topics().getPartitionedTopicMetadata(topicName);
+        } catch (NotFoundException e) {
+            // This topic doesn't exist. Just skip deletion.
+            return;
+        } catch (PulsarAdminException e) {
+            sneakyThrow(e);
+            return;
+        }
+
+        removeConsumers(topic);
+        removeProducers(topic);
+        if (metadata.partitions <= 0) {
+            sneakyAdmin(() -> admin().topics().delete(topicName));
         } else {
-            sneakyAdmin(() -> admin().topics().delete(topic));
+            sneakyAdmin(() -> 
admin().topics().deletePartitionedTopic(topicName));
         }
     }
 
+    /** Convert the topic metadata into a list of topic partitions. */
     public List<TopicPartition> topicInfo(String topic) {
         try {
             return client().getPartitionsForTopic(topic).get().stream()
@@ -144,10 +245,31 @@ public class PulsarRuntimeOperator implements 
Serializable, Closeable {
         }
     }
 
-    protected List<TopicPartition> topicsInfo(Collection<String> topics) {
-        return topics.stream().flatMap(s -> 
topicInfo(s).stream()).collect(toList());
+    /**
+     * Query a list of topics. Convert the topic metadata into a list of topic 
partitions. Return a
+     * mapping for topic and its partitions.
+     */
+    public Map<String, List<TopicPartition>> topicsInfo(String... topics) {
+        return topicsInfo(Arrays.asList(topics));
+    }
+
+    /**
+     * Query a list of topics. Convert the topic metadata into a list of topic 
partitions. Return a
+     * mapping for topic and its partitions.
+     */
+    public Map<String, List<TopicPartition>> topicsInfo(Collection<String> 
topics) {
+        return topics.stream().collect(toMap(identity(), this::topicInfo));
     }
 
+    /**
+     * Send a single message to Pulsar, return the message id after the ack 
from Pulsar.
+     *
+     * @param topic The name of the topic.
+     * @param schema The schema for serialization.
+     * @param message The record need to be sent.
+     * @param <T> The type of the record.
+     * @return message id.
+     */
     public <T> MessageId sendMessage(String topic, Schema<T> schema, T 
message) {
         List<MessageId> messageIds = sendMessages(topic, schema, 
singletonList(message));
         checkArgument(messageIds.size() == 1);
@@ -155,6 +277,16 @@ public class PulsarRuntimeOperator implements 
Serializable, Closeable {
         return messageIds.get(0);
     }
 
+    /**
+     * Send a single message to Pulsar, return the message id after the ack 
from Pulsar.
+     *
+     * @param topic The name of the topic.
+     * @param schema The schema for serialization.
+     * @param key The message key.
+     * @param message The record need to be sent.
+     * @param <T> The type of the record.
+     * @return message id.
+     */
     public <T> MessageId sendMessage(String topic, Schema<T> schema, String 
key, T message) {
         List<MessageId> messageIds = sendMessages(topic, schema, key, 
singletonList(message));
         checkArgument(messageIds.size() == 1);
@@ -162,23 +294,42 @@ public class PulsarRuntimeOperator implements 
Serializable, Closeable {
         return messageIds.get(0);
     }
 
+    /**
+     * Send a list of messages to Pulsar, return the message id set after the 
ack from Pulsar.
+     *
+     * @param topic The name of the topic.
+     * @param schema The schema for serialization.
+     * @param messages The records need to be sent.
+     * @param <T> The type of the record.
+     * @return message id.
+     */
     public <T> List<MessageId> sendMessages(
             String topic, Schema<T> schema, Collection<T> messages) {
         return sendMessages(topic, schema, null, messages);
     }
 
+    /**
+     * Send a list messages to Pulsar, return the message id set after the ack 
from Pulsar.
+     *
+     * @param topic The name of the topic.
+     * @param schema The schema for serialization.
+     * @param key The message key.
+     * @param messages The records need to be sent.
+     * @param <T> The type of the record.
+     * @return message id.
+     */
     public <T> List<MessageId> sendMessages(
             String topic, Schema<T> schema, String key, Collection<T> 
messages) {
-        try (Producer<T> producer = 
client().newProducer(schema).topic(topic).create()) {
+        try {
+            Producer<T> producer = createProducer(topic, schema);
             List<MessageId> messageIds = new ArrayList<>(messages.size());
 
             for (T message : messages) {
-                MessageId messageId;
-                if (Strings.isNullOrEmpty(key)) {
-                    messageId = producer.newMessage().value(message).send();
-                } else {
-                    messageId = 
producer.newMessage().key(key).value(message).send();
+                TypedMessageBuilder<T> builder = 
producer.newMessage().value(message);
+                if (!Strings.isNullOrEmpty(key)) {
+                    builder.key(key);
                 }
+                MessageId messageId = builder.send();
                 messageIds.add(messageId);
             }
 
@@ -189,22 +340,117 @@ public class PulsarRuntimeOperator implements 
Serializable, Closeable {
         }
     }
 
+    /**
+     * Consume a message from the given Pulsar topic, this method would be 
blocked until we get a
+     * message from this topic.
+     */
+    public <T> Message<T> receiveMessage(String topic, Schema<T> schema) {
+        try {
+            Consumer<T> consumer = createConsumer(topic, schema);
+            return drainOneMessage(consumer);
+        } catch (PulsarClientException e) {
+            sneakyThrow(e);
+            return null;
+        }
+    }
+
+    /**
+     * Consume a message from the given Pulsar topic, this method would be 
blocked until we meet
+     * timeout. A null message would be returned if no message has been 
consumed from Pulsar.
+     */
+    public <T> Message<T> receiveMessage(String topic, Schema<T> schema, 
Duration timeout) {
+        try {
+            Consumer<T> consumer = createConsumer(topic, schema);
+            Message<T> message = 
consumer.receiveAsync().get(timeout.toMillis(), MILLISECONDS);
+            consumer.acknowledgeCumulative(message.getMessageId());
+
+            return message;
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    /**
+     * Consume a fixed number of messages from the given Pulsar topic, this 
method would be blocked
+     * until we get the exactly number of messages from this topic.
+     */
+    public <T> List<Message<T>> receiveMessages(String topic, Schema<T> 
schema, int counts) {
+        if (counts == 0) {
+            return emptyList();
+        } else if (counts < 0) {
+            // Drain all messages.
+            return receiveAllMessages(topic, schema, Duration.ofMinutes(1));
+        } else if (counts == 1) {
+            // Drain one message.
+            Message<T> message = receiveMessage(topic, schema);
+            return singletonList(message);
+        } else {
+            // Drain a fixed number of messages.
+            try {
+                Consumer<T> consumer = createConsumer(topic, schema);
+                List<Message<T>> messages = new ArrayList<>(counts);
+                for (int i = 0; i < counts; i++) {
+                    Message<T> message = drainOneMessage(consumer);
+                    messages.add(message);
+                }
+                return messages;
+            } catch (PulsarClientException e) {
+                sneakyThrow(e);
+                return emptyList();
+            }
+        }
+    }
+
+    /**
+     * Drain all the messages from current topic. We will wait for all the 
messages has been
+     * consumed until the timeout.
+     */
+    public <T> List<Message<T>> receiveAllMessages(
+            String topic, Schema<T> schema, Duration timeout) {
+        List<Message<T>> messages = new ArrayList<>();
+
+        Message<T> message = receiveMessage(topic, schema, timeout);
+        while (message != null) {
+            messages.add(message);
+            message = receiveMessage(topic, schema, timeout);
+        }
+
+        return messages;
+    }
+
+    /** Return the transaction coordinator client for operating {@link TxnID}. 
*/
+    public TransactionCoordinatorClient coordinatorClient() {
+        return ((PulsarClientImpl) client()).getTcClient();
+    }
+
+    /**
+     * Return the broker url for this Pulsar runtime. It's only used in flink 
environment. You can't
+     * create the {@link PulsarClient} by this broker url, use the {@link 
#client()} instead.
+     */
     public String serviceUrl() {
         return serviceUrl;
     }
 
+    /**
+     * Return the broker http url for this Pulsar runtime. It's only used in 
flink environment. You
+     * can't create the {@link PulsarAdmin} by this broker http url, use the 
{@link #admin()}
+     * instead.
+     */
     public String adminUrl() {
         return adminUrl;
     }
 
+    /** The client for creating producer and consumer. It's used in tests. */
     public PulsarClient client() {
         return client;
     }
 
+    /** The client for creating topics and query other metadata, etc. It's 
used in tests. */
     public PulsarAdmin admin() {
         return admin;
     }
 
+    /** The configuration for constructing {@link PulsarConfiguration}. */
     public Configuration config() {
         Configuration configuration = new Configuration();
         configuration.set(PULSAR_SERVICE_URL, serviceUrl());
@@ -212,8 +458,25 @@ public class PulsarRuntimeOperator implements 
Serializable, Closeable {
         return configuration;
     }
 
+    /** Create the sink configuration with common settings. */
+    public Configuration sinkConfig(DeliveryGuarantee deliveryGuarantee) {
+        Configuration configuration = config();
+        configuration.set(PULSAR_WRITE_DELIVERY_GUARANTEE, deliveryGuarantee);
+        if (deliveryGuarantee == EXACTLY_ONCE) {
+            configuration.set(PULSAR_WRITE_TRANSACTION_TIMEOUT, 
Duration.ofMinutes(5).toMillis());
+            configuration.set(PULSAR_ENABLE_TRANSACTION, true);
+            configuration.set(PULSAR_SEND_TIMEOUT_MS, 0L);
+        }
+
+        return configuration;
+    }
+
+    /** This method is used for test framework. You can't close this operator 
manually. */
     @Override
     public void close() throws IOException {
+        producers.clear();
+        consumers.clear();
+
         if (admin != null) {
             admin.close();
         }
@@ -236,27 +499,94 @@ public class PulsarRuntimeOperator implements 
Serializable, Closeable {
     private void createPartitionedTopic(String topic, int numberOfPartitions) {
         try {
             admin().lookups().lookupPartitionedTopic(topic);
-            sneakyAdmin(() -> 
admin().topics().expireMessagesForAllSubscriptionsAsync(topic, 0));
+            sneakyAdmin(() -> 
admin().topics().expireMessagesForAllSubscriptions(topic, 0));
         } catch (PulsarAdminException e) {
             sneakyAdmin(() -> admin().topics().createPartitionedTopic(topic, 
numberOfPartitions));
         }
     }
 
-    private void initializeClients() {
-        this.client = sneakyClient(() -> 
PulsarClient.builder().serviceUrl(serviceUrl).build());
-        this.admin = sneakyClient(() -> 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build());
+    @SuppressWarnings("unchecked")
+    private <T> Producer<T> createProducer(String topic, Schema<T> schema)
+            throws PulsarClientException {
+        TopicName topicName = TopicName.get(topic);
+        String name = topicName.getPartitionedTopicName();
+        int index = topicName.getPartitionIndex();
+        ConcurrentHashMap<Integer, Producer<?>> topicProducers =
+                producers.computeIfAbsent(name, d -> new 
ConcurrentHashMap<>());
+
+        return (Producer<T>)
+                topicProducers.computeIfAbsent(
+                        index,
+                        i -> {
+                            try {
+                                return 
client().newProducer(schema).topic(topic).create();
+                            } catch (PulsarClientException e) {
+                                sneakyThrow(e);
+                                return null;
+                            }
+                        });
     }
 
-    // --------------------------- Serialization Logic 
-----------------------------
+    @SuppressWarnings("unchecked")
+    private <T> Consumer<T> createConsumer(String topic, Schema<T> schema)
+            throws PulsarClientException {
+        TopicName topicName = TopicName.get(topic);
+        String name = topicName.getPartitionedTopicName();
+        int index = topicName.getPartitionIndex();
+        ConcurrentHashMap<Integer, Consumer<?>> topicConsumers =
+                consumers.computeIfAbsent(name, d -> new 
ConcurrentHashMap<>());
+
+        return (Consumer<T>)
+                topicConsumers.computeIfAbsent(
+                        index,
+                        i -> {
+                            try {
+                                return client().newConsumer(schema)
+                                        .topic(topic)
+                                        .subscriptionName(SUBSCRIPTION_NAME)
+                                        .subscriptionMode(Durable)
+                                        .subscriptionType(Exclusive)
+                                        .subscriptionInitialPosition(
+                                                
SubscriptionInitialPosition.Earliest)
+                                        .subscribe();
+                            } catch (PulsarClientException e) {
+                                sneakyThrow(e);
+                                return null;
+                            }
+                        });
+    }
 
-    private void writeObject(ObjectOutputStream oos) throws IOException {
-        oos.writeUTF(serviceUrl);
-        oos.writeUTF(adminUrl);
+    private void removeProducers(String topic) {
+        String topicName = topicName(topic);
+        ConcurrentHashMap<Integer, Producer<?>> integerProducers = 
producers.remove(topicName);
+        if (integerProducers != null) {
+            for (Producer<?> producer : integerProducers.values()) {
+                try {
+                    producer.close();
+                } catch (PulsarClientException e) {
+                    sneakyThrow(e);
+                }
+            }
+        }
+    }
+
+    private void removeConsumers(String topic) {
+        String topicName = topicName(topic);
+        ConcurrentHashMap<Integer, Consumer<?>> integerConsumers = 
consumers.remove(topicName);
+        if (integerConsumers != null) {
+            for (Consumer<?> consumer : integerConsumers.values()) {
+                try {
+                    consumer.close();
+                } catch (PulsarClientException e) {
+                    sneakyThrow(e);
+                }
+            }
+        }
     }
 
-    private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
-        this.serviceUrl = ois.readUTF();
-        this.adminUrl = ois.readUTF();
-        initializeClients();
+    private <T> Message<T> drainOneMessage(Consumer<T> consumer) throws 
PulsarClientException {
+        Message<T> message = consumer.receive();
+        consumer.acknowledgeCumulative(message.getMessageId());
+        return message;
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index 5560767..3d66728 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -28,16 +28,17 @@ import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PulsarContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
 import org.testcontainers.utility.DockerImageName;
 
-import java.io.IOException;
 import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.DockerImageVersions.PULSAR;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.testcontainers.containers.PulsarContainer.BROKER_HTTP_PORT;
 import static org.testcontainers.containers.PulsarContainer.BROKER_PORT;
+import static org.testcontainers.containers.wait.strategy.Wait.forHttp;
 
 /**
  * {@link PulsarRuntime} implementation, use the TestContainers as the 
backend. We would start a
@@ -45,13 +46,14 @@ import static 
org.testcontainers.containers.PulsarContainer.BROKER_PORT;
  */
 public class PulsarContainerRuntime implements PulsarRuntime {
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarContainerRuntime.class);
-    private static final String PULSAR_INTERNAL_HOSTNAME = "pulsar";
 
+    // The default host for connecting in docker environment.
+    private static final String PULSAR_INTERNAL_HOSTNAME = "pulsar";
     // This url is used on the container side.
-    public static final String PULSAR_SERVICE_URL =
+    private static final String PULSAR_SERVICE_URL =
             String.format("pulsar://%s:%d", PULSAR_INTERNAL_HOSTNAME, 
BROKER_PORT);
     // This url is used on the container side.
-    public static final String PULSAR_ADMIN_URL =
+    private static final String PULSAR_ADMIN_URL =
             String.format("http://%s:%d";, PULSAR_INTERNAL_HOSTNAME, 
BROKER_HTTP_PORT);
 
     /**
@@ -60,50 +62,75 @@ public class PulsarContainerRuntime implements 
PulsarRuntime {
      */
     private final PulsarContainer container = new 
PulsarContainer(DockerImageName.parse(PULSAR));
 
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private boolean boundFlink = false;
     private PulsarRuntimeOperator operator;
 
     public PulsarContainerRuntime bindWithFlinkContainer(GenericContainer<?> 
flinkContainer) {
+        checkArgument(
+                !started.get(),
+                "This Pulsar container has been started, we can't bind it to a 
Flink container.");
+
         this.container
                 .withNetworkAliases(PULSAR_INTERNAL_HOSTNAME)
                 .dependsOn(flinkContainer)
                 .withNetwork(flinkContainer.getNetwork());
+        this.boundFlink = true;
         return this;
     }
 
     @Override
     public void startUp() {
-        // Prepare Pulsar Container.
+        boolean havenStartedBefore = started.compareAndSet(false, true);
+        if (!havenStartedBefore) {
+            LOG.warn("You have started the Pulsar Container. We will skip this 
execution.");
+            return;
+        }
+
+        // Override the default configuration in container for enabling the 
Pulsar transaction.
         container.withClasspathResourceMapping(
                 "containers/txnStandalone.conf",
                 "/pulsar/conf/standalone.conf",
                 BindMode.READ_ONLY);
-        container.addExposedPort(2181);
+        // Waiting for the Pulsar border is ready.
         container.waitingFor(
-                new HttpWaitStrategy()
+                forHttp("/admin/v2/namespaces/public/default")
                         .forPort(BROKER_HTTP_PORT)
                         .forStatusCode(200)
-                        .forPath("/admin/v2/namespaces/public/default")
                         .withStartupTimeout(Duration.ofMinutes(5)));
-
         // Start the Pulsar Container.
         container.start();
+        // Append the output to this runtime logger. Used for local debug 
purpose.
         container.followOutput(new 
Slf4jLogConsumer(LOG).withSeparateOutputStreams());
 
         // Create the operator.
-        this.operator =
-                new PulsarRuntimeOperator(
-                        container.getPulsarBrokerUrl(), 
container.getHttpServiceUrl());
+        if (boundFlink) {
+            this.operator =
+                    new PulsarRuntimeOperator(
+                            container.getPulsarBrokerUrl(),
+                            PULSAR_SERVICE_URL,
+                            container.getHttpServiceUrl(),
+                            PULSAR_ADMIN_URL);
+        } else {
+            this.operator =
+                    new PulsarRuntimeOperator(
+                            container.getPulsarBrokerUrl(), 
container.getHttpServiceUrl());
+        }
     }
 
     @Override
     public void tearDown() {
         try {
-            operator.close();
-            this.operator = null;
-        } catch (IOException e) {
+            if (operator != null) {
+                operator.close();
+                this.operator = null;
+            }
+            container.stop();
+            started.compareAndSet(true, false);
+        } catch (Exception e) {
             throw new IllegalStateException(e);
         }
-        container.stop();
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
new file mode 100644
index 0000000..d598e97
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.embedded;
+
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
+import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.pulsar.broker.ServiceConfigurationUtils.brokerUrl;
+import static org.apache.pulsar.broker.ServiceConfigurationUtils.webServiceUrl;
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static 
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
+
+/** Providing a embedded pulsar server. We use this runtime for transaction 
related tests. */
+public class PulsarEmbeddedRuntime implements PulsarRuntime {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarEmbeddedRuntime.class);
+
+    private static final String CONFIG_FILE_PATH;
+
+    static {
+        // Find the absolute path for containers/txnStandalone.conf
+        ClassLoader classLoader = PulsarEmbeddedRuntime.class.getClassLoader();
+        URL resource = 
classLoader.getResource("containers/txnStandalone.conf");
+        File file = new File(checkNotNull(resource).getFile());
+        CONFIG_FILE_PATH = file.getAbsolutePath();
+    }
+
+    private final Path tempDir;
+
+    private LocalBookkeeperEnsemble bookkeeper;
+    private PulsarService pulsarService;
+    private PulsarRuntimeOperator operator;
+
+    public PulsarEmbeddedRuntime() {
+        this.tempDir = createTempDir();
+    }
+
+    @Override
+    public void startUp() {
+        try {
+            startBookkeeper();
+            startPulsarService();
+
+            // Create the operator.
+            this.operator = new PulsarRuntimeOperator(getBrokerUrl(), 
getWebServiceUrl());
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void tearDown() {
+        try {
+            if (operator != null) {
+                operator.close();
+                this.operator = null;
+            }
+            if (pulsarService != null) {
+                pulsarService.close();
+            }
+            if (bookkeeper != null) {
+                bookkeeper.stop();
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        } finally {
+            removeTempDir(tempDir);
+        }
+    }
+
+    @Override
+    public PulsarRuntimeOperator operator() {
+        return checkNotNull(operator, "You should start this embedded Pulsar 
first.");
+    }
+
+    private Path createTempDir() {
+        try {
+            return Files.createTempDirectory("pulsar");
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private void removeTempDir(Path tempDir) {
+        try {
+            FileUtils.deleteDirectory(tempDir.normalize().toFile());
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void startBookkeeper() throws Exception {
+        Path zkPath = Paths.get("data", "standalone", "zookeeper");
+        Path bkPath = Paths.get("data", "standalone", "bookkeeper");
+
+        String zkDir = tempDir.resolve(zkPath).normalize().toString();
+        String bkDir = tempDir.resolve(bkPath).normalize().toString();
+
+        ServerConfiguration bkServerConf = new ServerConfiguration();
+        bkServerConf.loadConf(new File(CONFIG_FILE_PATH).toURI().toURL());
+        this.bookkeeper = new LocalBookkeeperEnsemble(1, 0, 0, zkDir, bkDir, 
true, "127.0.0.1");
+
+        // Start Bookkeeper & zookeeper.
+        bookkeeper.startStandalone(bkServerConf, false);
+    }
+
+    private void startPulsarService() throws Exception {
+        ServiceConfiguration config;
+        try (FileInputStream inputStream = new 
FileInputStream(CONFIG_FILE_PATH)) {
+            config = PulsarConfigurationLoader.create(inputStream, 
ServiceConfiguration.class);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+
+        // Use runtime dynamic ports for broker.
+        config.setAdvertisedAddress("127.0.0.1");
+        config.setClusterName("standalone");
+
+        // Use random port.
+        config.setBrokerServicePort(Optional.of(0));
+        config.setWebServicePort(Optional.of(0));
+
+        // Select available port for bookkeeper and zookeeper.
+        int zkPort = getZkPort();
+        String zkConnect = "127.0.0.1" + ":" + zkPort;
+        config.setZookeeperServers(zkConnect);
+        config.setConfigurationStoreServers(zkConnect);
+        config.setRunningStandalone(true);
+
+        this.pulsarService =
+                new PulsarService(
+                        config,
+                        new WorkerConfig(),
+                        Optional.empty(),
+                        (exitCode) -> {
+                            LOG.info("Halting standalone process with code 
{}", exitCode);
+                            LogManager.shutdown();
+                            Runtime.getRuntime().halt(exitCode);
+                        });
+
+        // Start Pulsar Broker.
+        pulsarService.start();
+
+        // Create sample data environment.
+        String webServiceUrl = getWebServiceUrl();
+        String brokerUrl = getBrokerUrl();
+        try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(webServiceUrl).build()) {
+            ClusterData clusterData =
+                    ClusterData.builder()
+                            .serviceUrl(webServiceUrl)
+                            .brokerServiceUrl(brokerUrl)
+                            .build();
+            String cluster = config.getClusterName();
+            createSampleNameSpace(admin, clusterData, cluster);
+
+            // Create default namespace
+            createNameSpace(
+                    admin,
+                    cluster,
+                    TopicName.PUBLIC_TENANT,
+                    TopicName.PUBLIC_TENANT + "/" + 
TopicName.DEFAULT_NAMESPACE);
+
+            // Create Pulsar system namespace
+            createNameSpace(
+                    admin, cluster, SYSTEM_NAMESPACE.getTenant(), 
SYSTEM_NAMESPACE.toString());
+            // Enable transaction
+            if (config.isTransactionCoordinatorEnabled()
+                    && !admin.namespaces()
+                            .getTopics(SYSTEM_NAMESPACE.toString())
+                            
.contains(TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())) {
+                
admin.topics().createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+            }
+        }
+    }
+
+    private int getZkPort() {
+        return checkNotNull(bookkeeper).getZookeeperPort();
+    }
+
+    private String getBrokerUrl() {
+        Integer port = 
pulsarService.getBrokerListenPort().orElseThrow(IllegalStateException::new);
+        return brokerUrl("127.0.0.1", port);
+    }
+
+    private String getWebServiceUrl() {
+        Integer port = 
pulsarService.getListenPortHTTP().orElseThrow(IllegalArgumentException::new);
+        return webServiceUrl("127.0.0.1", port);
+    }
+
+    private void createSampleNameSpace(PulsarAdmin admin, ClusterData 
clusterData, String cluster)
+            throws PulsarAdminException {
+        // Create a sample namespace
+        String tenant = "sample";
+        String globalCluster = "global";
+        String namespace = tenant + "/ns1";
+
+        List<String> clusters = admin.clusters().getClusters();
+        if (!clusters.contains(cluster)) {
+            admin.clusters().createCluster(cluster, clusterData);
+        } else {
+            admin.clusters().updateCluster(cluster, clusterData);
+        }
+        // Create marker for "global" cluster
+        if (!clusters.contains(globalCluster)) {
+            admin.clusters().createCluster(globalCluster, 
ClusterData.builder().build());
+        }
+
+        if (!admin.tenants().getTenants().contains(tenant)) {
+            admin.tenants()
+                    .createTenant(
+                            tenant,
+                            new TenantInfoImpl(
+                                    Collections.emptySet(), 
Collections.singleton(cluster)));
+        }
+
+        if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) {
+            admin.namespaces().createNamespace(namespace);
+        }
+    }
+
+    private void createNameSpace(
+            PulsarAdmin admin, String cluster, String publicTenant, String 
defaultNamespace)
+            throws PulsarAdminException {
+        if (!admin.tenants().getTenants().contains(publicTenant)) {
+            admin.tenants()
+                    .createTenant(
+                            publicTenant,
+                            TenantInfo.builder()
+                                    .adminRoles(Collections.emptySet())
+                                    
.allowedClusters(Collections.singleton(cluster))
+                                    .build());
+        }
+        if 
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
+            admin.namespaces().createNamespace(defaultNamespace);
+            admin.namespaces()
+                    .setNamespaceReplicationClusters(
+                            defaultNamespace, Collections.singleton(cluster));
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/BlankBrokerInterceptor.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/BlankBrokerInterceptor.java
deleted file mode 100644
index 8355a23..0000000
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/BlankBrokerInterceptor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.runtime.mock;
-
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.service.ServerCnx;
-import org.apache.pulsar.common.api.proto.BaseCommand;
-
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-
-/** No operation for this BrokerInterceptor implementation. */
-public class BlankBrokerInterceptor implements BrokerInterceptor {
-
-    @Override
-    public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
-        // no-op
-    }
-
-    @Override
-    public void onConnectionClosed(ServerCnx cnx) {
-        // no-op
-    }
-
-    @Override
-    public void onWebserviceRequest(ServletRequest request) {
-        // no-op
-    }
-
-    @Override
-    public void onWebserviceResponse(ServletRequest request, ServletResponse 
response) {
-        // no-op
-    }
-
-    @Override
-    public void initialize(PulsarService pulsarService) {
-        // no-op
-    }
-
-    @Override
-    public void close() {
-        // no-op
-    }
-}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockBookKeeperClientFactory.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockBookKeeperClientFactory.java
deleted file mode 100644
index 41fad54..0000000
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockBookKeeperClientFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.runtime.mock;
-
-import io.netty.channel.EventLoopGroup;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.pulsar.broker.BookKeeperClientFactory;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.zookeeper.ZooKeeper;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-
-/** A BookKeeperClientFactory implementation which returns a mocked 
bookkeeper. */
-public class MockBookKeeperClientFactory implements BookKeeperClientFactory {
-
-    private final OrderedExecutor executor =
-            
OrderedExecutor.newBuilder().numThreads(1).name("mock-pulsar-bookkeeper").build();
-
-    private final BookKeeper bookKeeper = 
NonClosableMockBookKeeper.create(executor);
-
-    @Override
-    public BookKeeper create(
-            ServiceConfiguration conf,
-            ZooKeeper zkClient,
-            EventLoopGroup eventLoopGroup,
-            Optional<Class<? extends EnsemblePlacementPolicy>> 
ensemblePlacementPolicyClass,
-            Map<String, Object> ensemblePlacementPolicyProperties)
-            throws IOException {
-        return bookKeeper;
-    }
-
-    @Override
-    public BookKeeper create(
-            ServiceConfiguration conf,
-            ZooKeeper zkClient,
-            EventLoopGroup eventLoopGroup,
-            Optional<Class<? extends EnsemblePlacementPolicy>> 
ensemblePlacementPolicyClass,
-            Map<String, Object> ensemblePlacementPolicyProperties,
-            StatsLogger statsLogger)
-            throws IOException {
-        return bookKeeper;
-    }
-
-    @Override
-    public void close() {
-        try {
-            bookKeeper.close();
-            executor.shutdown();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockPulsarService.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockPulsarService.java
deleted file mode 100644
index 6b6c412..0000000
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockPulsarService.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.runtime.mock;
-
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.pulsar.broker.BookKeeperClientFactory;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.zookeeper.MockZooKeeperSession;
-
-import java.util.function.Supplier;
-
-/** A Mock pulsar service which would use the mocked zookeeper and bookkeeper. 
*/
-public class MockPulsarService extends PulsarService {
-
-    private final int brokerServicePort;
-
-    private final MockZooKeeperClientFactory zooKeeperClientFactory =
-            new MockZooKeeperClientFactory();
-
-    private final MockZooKeeperSession zooKeeperSession =
-            
MockZooKeeperSession.newInstance(zooKeeperClientFactory.getZooKeeper());
-
-    private final SameThreadOrderedSafeExecutor orderedExecutor =
-            new SameThreadOrderedSafeExecutor();
-
-    public MockPulsarService(ServiceConfiguration config) {
-        super(config);
-        this.brokerServicePort =
-                
config.getBrokerServicePort().orElseThrow(IllegalArgumentException::new);
-    }
-
-    public ZooKeeperClientFactory getZooKeeperClientFactory() {
-        return zooKeeperClientFactory;
-    }
-
-    public BookKeeperClientFactory newBookKeeperClientFactory() {
-        return new MockBookKeeperClientFactory();
-    }
-
-    public MetadataStoreExtended createLocalMetadataStore() {
-        return new ZKMetadataStore(zooKeeperSession);
-    }
-
-    public MetadataStoreExtended createConfigurationMetadataStore() {
-        return new ZKMetadataStore(zooKeeperSession);
-    }
-
-    public Supplier<NamespaceService> getNamespaceServiceProvider() {
-        return () -> new NamespaceService(this);
-    }
-
-    @Override
-    public OrderedExecutor getOrderedExecutor() {
-        return orderedExecutor;
-    }
-
-    @Override
-    public BrokerInterceptor getBrokerInterceptor() {
-        return new BlankBrokerInterceptor();
-    }
-
-    public int getBrokerServicePort() {
-        return brokerServicePort;
-    }
-}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockZooKeeperClientFactory.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockZooKeeperClientFactory.java
deleted file mode 100644
index 3c89484..0000000
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockZooKeeperClientFactory.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.runtime.mock;
-
-import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors;
-
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.MockZooKeeper;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static 
org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl.ENCODING_SCHEME;
-import static org.apache.zookeeper.CreateMode.PERSISTENT;
-
-/** A ZooKeeperClientFactory implementation which returns mocked zookeeper 
instead of normal zk. */
-public class MockZooKeeperClientFactory implements ZooKeeperClientFactory {
-
-    private final MockZooKeeper zooKeeper;
-
-    public MockZooKeeperClientFactory() {
-        this.zooKeeper = 
MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
-        List<ACL> dummyAclList = new ArrayList<>(0);
-
-        try {
-            ZkUtils.createFullPathOptimistic(
-                    zooKeeper,
-                    "/ledgers/available/192.168.1.1:" + 5000,
-                    "".getBytes(ENCODING_SCHEME),
-                    dummyAclList,
-                    PERSISTENT);
-
-            zooKeeper.create(
-                    "/ledgers/LAYOUT",
-                    "1\nflat:1".getBytes(ENCODING_SCHEME),
-                    dummyAclList,
-                    PERSISTENT);
-        } catch (KeeperException | InterruptedException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    @Override
-    public CompletableFuture<ZooKeeper> create(
-            String serverList, SessionType sessionType, int 
zkSessionTimeoutMillis) {
-        return CompletableFuture.completedFuture(zooKeeper);
-    }
-
-    MockZooKeeper getZooKeeper() {
-        return zooKeeper;
-    }
-}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/NonClosableMockBookKeeper.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/NonClosableMockBookKeeper.java
deleted file mode 100644
index b7001b8..0000000
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/NonClosableMockBookKeeper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.runtime.mock;
-
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.PulsarMockBookKeeper;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-
-/**
- * Prevent the MockBookKeeper instance from being closed when the broker is 
restarted within a test.
- */
-public class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
-
-    private NonClosableMockBookKeeper(OrderedExecutor executor) throws 
Exception {
-        super(executor);
-    }
-
-    @Override
-    public void close() {
-        // no-op
-    }
-
-    @Override
-    public void shutdown() {
-        // no-op
-    }
-
-    public void reallyShutdown() {
-        super.shutdown();
-    }
-
-    public static BookKeeper create(OrderedExecutor executor) {
-        try {
-            return new NonClosableMockBookKeeper(executor);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
deleted file mode 100644
index b6f8aa4..0000000
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.runtime.mock;
-
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
-import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
-
-import org.apache.pulsar.broker.PulsarServerException;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-
-import java.util.Optional;
-
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
-import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** Providing a mocked pulsar server. */
-public class PulsarMockRuntime implements PulsarRuntime {
-
-    private static final String CLUSTER_NAME = "mock-pulsar-" + 
randomAlphanumeric(6);
-    private final MockPulsarService pulsarService;
-    private PulsarRuntimeOperator operator;
-
-    public PulsarMockRuntime() {
-        this(createConfig());
-    }
-
-    public PulsarMockRuntime(ServiceConfiguration configuration) {
-        this.pulsarService = new MockPulsarService(configuration);
-    }
-
-    @Override
-    public void startUp() {
-        try {
-            pulsarService.start();
-        } catch (PulsarServerException e) {
-            throw new IllegalStateException(e);
-        }
-        this.operator =
-                new PulsarRuntimeOperator(
-                        pulsarService.getBrokerServiceUrl(), 
pulsarService.getWebServiceAddress());
-
-        // Successfully start a pulsar broker, we have to create the required 
resources.
-        sneakyAdmin(this::createTestResource);
-    }
-
-    @Override
-    public void tearDown() {
-        try {
-            pulsarService.close();
-            operator.close();
-            this.operator = null;
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    @Override
-    public PulsarRuntimeOperator operator() {
-        return checkNotNull(operator, "You should start this mock pulsar 
first.");
-    }
-
-    private void createTestResource() throws PulsarAdminException {
-        PulsarAdmin admin = operator().admin();
-        if (!admin.clusters().getClusters().contains(CLUSTER_NAME)) {
-            // Make clients can test short names
-            ClusterData data =
-                    ClusterData.builder()
-                            .serviceUrl("http://127.0.0.1:"; + 
pulsarService.getBrokerServicePort())
-                            .build();
-            admin.clusters().createCluster(CLUSTER_NAME, data);
-        }
-
-        createOrUpdateTenant("public");
-        createOrUpdateNamespace("public", "default");
-
-        createOrUpdateTenant("pulsar");
-        createOrUpdateNamespace("pulsar", "system");
-    }
-
-    private void createOrUpdateTenant(String tenant) throws 
PulsarAdminException {
-        PulsarAdmin admin = operator().admin();
-        TenantInfo info =
-                TenantInfo.builder()
-                        .adminRoles(ImmutableSet.of("appid1", "appid2"))
-                        .allowedClusters(ImmutableSet.of(CLUSTER_NAME))
-                        .build();
-        if (!admin.tenants().getTenants().contains(tenant)) {
-            admin.tenants().createTenant(tenant, info);
-        } else {
-            admin.tenants().updateTenant(tenant, info);
-        }
-    }
-
-    public void createOrUpdateNamespace(String tenant, String namespace)
-            throws PulsarAdminException {
-        PulsarAdmin admin = operator().admin();
-        String namespaceValue = tenant + "/" + namespace;
-        if 
(!admin.namespaces().getNamespaces(tenant).contains(namespaceValue)) {
-            admin.namespaces().createNamespace(namespaceValue);
-            admin.namespaces().setRetention(namespaceValue, new 
RetentionPolicies(60, 1000));
-        }
-    }
-
-    private static ServiceConfiguration createConfig() {
-        ServiceConfiguration configuration = new ServiceConfiguration();
-
-        configuration.setAdvertisedAddress("localhost");
-        configuration.setClusterName(CLUSTER_NAME);
-
-        configuration.setManagedLedgerCacheSizeMB(8);
-        configuration.setActiveConsumerFailoverDelayTimeMillis(0);
-        configuration.setDefaultRetentionTimeInMinutes(7);
-        configuration.setDefaultNumberOfNamespaceBundles(1);
-        configuration.setZookeeperServers("localhost:2181");
-        configuration.setConfigurationStoreServers("localhost:3181");
-
-        configuration.setAuthenticationEnabled(false);
-        configuration.setAuthorizationEnabled(false);
-        configuration.setAllowAutoTopicCreation(true);
-        configuration.setBrokerDeleteInactiveTopicsEnabled(false);
-
-        configuration.setWebSocketServiceEnabled(false);
-        // Use runtime dynamic ports
-        configuration.setBrokerServicePort(Optional.of(0));
-        configuration.setWebServicePort(Optional.of(0));
-
-        // Enable transaction with in memory.
-        configuration.setTransactionCoordinatorEnabled(true);
-        configuration.setTransactionMetadataStoreProviderClassName(
-                
"org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider");
-        configuration.setTransactionBufferProviderClassName(
-                
"org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider");
-
-        return configuration;
-    }
-}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/SameThreadOrderedSafeExecutor.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/SameThreadOrderedSafeExecutor.java
deleted file mode 100644
index 9667f08..0000000
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/SameThreadOrderedSafeExecutor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.runtime.mock;
-
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.SafeRunnable;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
-
-/** Override the default bookkeeper executor for executing in one thread 
executor. */
-public class SameThreadOrderedSafeExecutor extends OrderedExecutor {
-
-    public SameThreadOrderedSafeExecutor() {
-        super(
-                "same-thread-executor",
-                1,
-                new DefaultThreadFactory("test"),
-                NullStatsLogger.INSTANCE,
-                false,
-                false,
-                100000,
-                -1,
-                false);
-    }
-
-    @Override
-    public void execute(Runnable r) {
-        r.run();
-    }
-
-    @Override
-    public void executeOrdered(int orderingKey, SafeRunnable r) {
-        r.run();
-    }
-
-    @Override
-    public void executeOrdered(long orderingKey, SafeRunnable r) {
-        r.run();
-    }
-}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
 
b/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
index f1a40365..bf35c59 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
@@ -1,15 +1,20 @@
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #
 
 ### --- General broker settings --- ###
@@ -28,6 +33,9 @@ webServicePort=8080
 # Hostname or IP address the service binds on, default is 0.0.0.0.
 bindAddress=0.0.0.0
 
+# Extra bind addresses for the service: 
<listener_name>:<scheme>://<host>:<port>,[...]
+bindAddresses=
+
 # Hostname or IP address the service advertises to the outside world. If not 
set, the value of InetAddress.getLocalHost().getHostName() is used.
 advertisedAddress=
 
@@ -94,12 +102,19 @@ backlogQuotaDefaultLimitSecond=-1
 # Default ttl for namespaces if ttl is not already configured at namespace 
policies. (disable default-ttl with value 0)
 ttlDurationDefaultInSeconds=0
 
-# Enable the deletion of inactive topics
+# Enable the deletion of inactive topics. This parameter need to cooperate 
with the allowAutoTopicCreation parameter.
+# If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that 
allowAutoTopicCreation is also set to true.
 brokerDeleteInactiveTopicsEnabled=true
 
 # How often to check for inactive topics
 brokerDeleteInactiveTopicsFrequencySeconds=60
 
+# Allow you to delete a tenant forcefully.
+forceDeleteTenantAllowed=false
+
+# Allow you to delete a namespace forcefully.
+forceDeleteNamespaceAllowed=false
+
 # Max pending publish requests per connection to avoid keeping large number of 
pending
 # requests in memory. Default: 1000
 maxPendingPublishRequestsPerConnection=1000
@@ -107,6 +122,10 @@ maxPendingPublishRequestsPerConnection=1000
 # How frequently to proactively check and purge expired messages
 messageExpiryCheckIntervalInMinutes=5
 
+# Check between intervals to see if max message size in topic policies has 
been updated.
+# Default is 60s
+maxMessageSizeCheckIntervalInSeconds=60
+
 # How long to delay rewinding cursor and dispatching messages when active 
consumer is changed
 activeConsumerFailoverDelayTimeMillis=1000
 
@@ -157,6 +176,10 @@ defaultNumberOfNamespaceBundles=4
 # Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
 maxTopicsPerNamespace=0
 
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
 # Enable check for minimum allowed client library version
 clientLibraryVersionCheckEnabled=false
 
@@ -215,6 +238,10 @@ dispatchThrottlingRatePerTopicInMsg=0
 # default message-byte dispatch-throttling
 dispatchThrottlingRatePerTopicInByte=0
 
+# Apply dispatch rate limiting on batch message instead individual
+# messages with in batch message. (Default is disabled)
+dispatchThrottlingOnBatchMessageEnabled=false
+
 # Dispatch rate-limiting relative to publish rate.
 # (Enabling flag will make broker to dynamically update dispatch-rate 
relatively to publish-rate:
 # throttle-dispatch-rate = (publish-rate + configured dispatch-rate).
@@ -224,6 +251,15 @@ dispatchThrottlingRateRelativeToPublishRate=false
 # backlog.
 dispatchThrottlingOnNonBacklogConsumerEnabled=true
 
+# The read failure backoff initial time in milliseconds. By default it is 15s.
+dispatcherReadFailureBackoffInitialTimeInMs=15000
+
+# The read failure backoff max time in milliseconds. By default it is 60s.
+dispatcherReadFailureBackoffMaxTimeInMs=60000
+
+# The read failure backoff mandatory stop time in milliseconds. By default it 
is 0s.
+dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
+
 # Precise dispathcer flow control according to history message number of each 
entry
 preciseDispatcherFlowControl=false
 
@@ -284,6 +320,20 @@ maxConsumersPerSubscription=0
 # Use 0 or negative number to disable the check
 maxNumPartitionsPerPartitionedTopic=0
 
+### --- Metadata Store --- ###
+
+# Whether we should enable metadata operations batching
+metadataStoreBatchingEnabled=true
+
+# Maximum delay to impose on batching grouping
+metadataStoreBatchingMaxDelayMillis=5
+
+# Maximum number of operations to include in a singular batch
+metadataStoreBatchingMaxOperations=1000
+
+# Maximum size of a batch
+metadataStoreBatchingMaxSizeKb=128
+
 ### --- TLS --- ###
 # Deprecated - Use webServicePortTls and brokerServicePortTls instead
 tlsEnabled=false
@@ -585,7 +635,7 @@ managedLedgerDefaultAckQuorum=1
 
 # How frequently to flush the cursor positions that were accumulated due to 
rate limiting. (seconds).
 # Default is 60 seconds
-managedLedgerCursorPositionFlushSeconds = 60
+managedLedgerCursorPositionFlushSeconds=60
 
 # Default type of checksum to use when writing to BookKeeper. Default is 
"CRC32C"
 # Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
@@ -622,10 +672,11 @@ managedLedgerCursorBackloggedThreshold=1000
 managedLedgerDefaultMarkDeleteRateLimit=0.1
 
 # Max number of entries to append to a ledger before triggering a rollover
-# A ledger rollover is triggered on these conditions
-#  * Either the max rollover time has been reached
-#  * or max entries have been written to the ledger and at least min-time
-#    has passed
+# A ledger rollover is triggered after the min rollover time has passed
+# and one of the following conditions is true:
+#  * The max rollover time has been reached
+#  * The max entries have been written to the ledger
+#  * The max ledger size has been written to the ledger
 managedLedgerMaxEntriesPerLedger=50000
 
 # Minimum time between ledger rollover for a topic
@@ -714,7 +765,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1
 # some over-loaded broker to other under-loaded brokers
 loadBalancerSheddingIntervalMinutes=1
 
-# Prevent the same topics to be shed and moved to other broker more that once 
within this timeframe
+# Prevent the same topics to be shed and moved to other broker more than once 
within this timeframe
 loadBalancerSheddingGracePeriodMinutes=30
 
 # Usage threshold to allocate max number of topics to broker
@@ -778,6 +829,9 @@ loadBalancerDirectMemoryResourceWeight=1.0
 # It only takes effect in the ThresholdShedder strategy.
 loadBalancerBundleUnloadMinThroughputThreshold=10
 
+# Time to wait for the unloading of a namespace bundle
+namespaceBundleUnloadingTimeoutMs=60000
+
 ### --- Replication --- ###
 
 # Enable replication metrics
@@ -844,6 +898,15 @@ exposePublisherStats=true
 # Default is false.
 exposePreciseBacklogInPrometheus=false
 
+# Enable splitting topic and partition label in Prometheus.
+# If enabled, a topic name will split into 2 parts, one is topic name without 
partition index,
+# another one is partition index, e.g. (topic=xxx, partition=0).
+# If the topic is a non-partitioned topic, -1 will be used for the partition 
index.
+# If disabled, one label to represent the topic and partition, e.g. 
(topic=xxx-partition-0)
+# Default is false.
+
+splitTopicAndPartitionLabelInPrometheus=false
+
 ### --- Deprecated config variables --- ###
 
 # Deprecated. Use configurationStoreServers
@@ -904,7 +967,7 @@ journalSyncData=false
 
 # For each ledger dir, maximum disk space which can be used.
 # Default is 0.95f. i.e. 95% of disk can be used at most after which nothing 
will
-# be written to that partition. If all ledger dir partitions are full, then 
bookie
+# be written to that partition. If all ledger dir partions are full, then 
bookie
 # will turn to readonly mode if 'readOnlyModeEnabled=true' is set, else it will
 # shutdown.
 # Valid values should be in between 0 and 1 (exclusive).
@@ -952,8 +1015,7 @@ defaultNumPartitions=1
 ### --- Transaction config variables --- ###
 # Enable transaction coordinator in broker
 transactionCoordinatorEnabled=true
-; 
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider
-transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider
+transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider
 
 # Transaction buffer take snapshot transaction count
 transactionBufferSnapshotMaxTransactionCount=1000
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
index 38c82ae..47cd1d9 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
@@ -332,7 +332,7 @@ public class FlinkContainers implements BeforeAllCallback, 
AfterAllCallback {
             restClusterClient.close();
         }
         final Configuration clientConfiguration = new Configuration();
-        clientConfiguration.set(RestOptions.ADDRESS, "localhost");
+        clientConfiguration.set(RestOptions.ADDRESS, getJobManagerHost());
         clientConfiguration.set(
                 RestOptions.PORT, 
jobManager.getMappedPort(conf.get(RestOptions.PORT)));
         return new RestClusterClient<>(clientConfiguration, 
StandaloneClusterId.getInstance());
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
index 7d22e80..502b41d 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -29,8 +28,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
 import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
-
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+import 
org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
 
 /**
  * Pulsar E2E test based on connector testing framework. It's used for 
Failover & Exclusive
@@ -48,8 +46,7 @@ public class PulsarSourceOrderedE2ECase extends 
SourceTestSuiteBase<String> {
 
     // Defines ConnectorExternalSystem.
     @TestExternalSystem
-    PulsarTestEnvironment pulsar =
-            new 
PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager()));
+    PulsarContainerTestEnvironment pulsar = new 
PulsarContainerTestEnvironment(flink);
 
     // Defines a set of external context Factories for different test cases.
     @TestContext
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index d14d8f9..5039048 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -28,10 +27,9 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
 import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
+import 
org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
 import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase;
 
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
-
 /**
  * Pulsar E2E test based on connector testing framework. It's used for Shared 
& Key_Shared
  * subscription.
@@ -48,8 +46,7 @@ public class PulsarSourceUnorderedE2ECase extends 
UnorderedSourceTestSuiteBase<S
 
     // Defines ConnectorExternalSystem.
     @TestExternalSystem
-    PulsarTestEnvironment pulsar =
-            new 
PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager()));
+    PulsarContainerTestEnvironment pulsar = new 
PulsarContainerTestEnvironment(flink);
 
     // Defines a set of external context Factories for different test cases.
     @SuppressWarnings("unused")
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
index 1245e14..6fea0c9 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
@@ -27,12 +27,8 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
-
 /** We would consume from test splits by using {@link 
SubscriptionType#Exclusive} subscription. */
 public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext 
{
-    private static final long serialVersionUID = 1L;
 
     public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
         this(environment, Collections.emptyList());
@@ -57,14 +53,4 @@ public class ExclusiveSubscriptionContext extends 
MultipleTopicTemplateContext {
     protected SubscriptionType subscriptionType() {
         return SubscriptionType.Exclusive;
     }
-
-    @Override
-    protected String serviceUrl() {
-        return PULSAR_SERVICE_URL;
-    }
-
-    @Override
-    protected String adminUrl() {
-        return PULSAR_ADMIN_URL;
-    }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
index 8ec1685..c473488 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
@@ -27,12 +27,8 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
-
 /** We would consume from test splits by using {@link 
SubscriptionType#Failover} subscription. */
 public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
-    private static final long serialVersionUID = 1L;
 
     public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
         this(environment, Collections.emptyList());
@@ -57,14 +53,4 @@ public class FailoverSubscriptionContext extends 
MultipleTopicTemplateContext {
     protected SubscriptionType subscriptionType() {
         return SubscriptionType.Failover;
     }
-
-    @Override
-    protected String serviceUrl() {
-        return PULSAR_SERVICE_URL;
-    }
-
-    @Override
-    protected String adminUrl() {
-        return PULSAR_ADMIN_URL;
-    }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
index 303783a..5ad369b 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
@@ -46,13 +46,10 @@ import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
 import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
 import static org.apache.pulsar.client.api.Schema.STRING;
 
 /** We would consume from test splits by using {@link 
SubscriptionType#Key_Shared} subscription. */
 public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
-    private static final long serialVersionUID = 1L;
 
     private int index = 0;
 
@@ -92,8 +89,8 @@ public class KeySharedSubscriptionContext extends 
PulsarTestContext<String> {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
                         .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(PULSAR_SERVICE_URL)
-                        .setAdminUrl(PULSAR_ADMIN_URL)
+                        .setServiceUrl(operator.serviceUrl())
+                        .setAdminUrl(operator.adminUrl())
                         .setTopicPattern(
                                 "pulsar-[0-9]+-key-shared", 
RegexSubscriptionMode.AllTopics)
                         .setSubscriptionType(SubscriptionType.Key_Shared)
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
index de53595..1a2db66 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
@@ -40,13 +40,10 @@ import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
 import static org.apache.pulsar.client.api.Schema.STRING;
 
 /** We would consuming from test splits by using {@link 
SubscriptionType#Shared} subscription. */
 public class SharedSubscriptionContext extends PulsarTestContext<String> {
-    private static final long serialVersionUID = 1L;
 
     private int index = 0;
 
@@ -71,8 +68,8 @@ public class SharedSubscriptionContext extends 
PulsarTestContext<String> {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
                         .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(PULSAR_SERVICE_URL)
-                        .setAdminUrl(PULSAR_ADMIN_URL)
+                        .setServiceUrl(operator.serviceUrl())
+                        .setAdminUrl(operator.adminUrl())
                         .setTopicPattern("pulsar-[0-9]+-shared", 
RegexSubscriptionMode.AllTopics)
                         .setSubscriptionType(SubscriptionType.Shared)
                         .setSubscriptionName("pulsar-shared");
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java
new file mode 100644
index 0000000..654347b
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+
+/** This test environment is used for create a Pulsar standalone instance for 
e2e tests. */
+public class PulsarContainerTestEnvironment extends PulsarTestEnvironment {
+
+    public PulsarContainerTestEnvironment(FlinkContainerWithPulsarEnvironment 
flinkEnvironment) {
+        
super(container(flinkEnvironment.getFlinkContainers().getJobManager()));
+    }
+}

Reply via email to