This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ef5d76cf6e5a1816033582360022d0938f6a0fd
Author: Yufan Sheng <[email protected]>
AuthorDate: Wed Aug 17 07:02:53 2022 +0800

    [FLINK-28934][Connector/pulsar] Fix split assignment in different Pulsar 
subscriptions.
---
 .../pulsar/common/schema/PulsarSchema.java         |  12 +-
 .../pulsar/common/utils/PulsarSerdeUtils.java      |   5 +-
 .../connector/pulsar/source/PulsarSource.java      |  12 +-
 .../source/enumerator/PulsarSourceEnumState.java   |  54 +-----
 .../PulsarSourceEnumStateSerializer.java           |  51 ++---
 .../source/enumerator/PulsarSourceEnumerator.java  |  52 +++++-
 .../assigner/NonSharedSplitAssigner.java           | 100 +++++-----
 .../enumerator/assigner/SharedSplitAssigner.java   | 108 +++--------
 .../source/enumerator/assigner/SplitAssigner.java  |   7 +-
 .../enumerator/assigner/SplitAssignerBase.java     | 102 ++++++++++
 .../enumerator/assigner/SplitAssignerFactory.java  |  43 ++---
 .../source/enumerator/cursor/CursorPosition.java   |   6 +
 .../fetcher/PulsarUnorderedFetcherManager.java     |   8 +-
 .../reader/source/PulsarUnorderedSourceReader.java |  34 ++++
 .../split/PulsarPartitionSplitReaderBase.java      |  15 +-
 .../split/PulsarUnorderedPartitionSplitReader.java |   9 +-
 .../PulsarSourceEnumStateSerializerTest.java       |  24 +--
 .../enumerator/PulsarSourceEnumeratorTest.java     | 205 +++++++++------------
 .../assigner/NonSharedSplitAssignerTest.java       |  64 ++++---
 .../assigner/SharedSplitAssignerTest.java          |  43 ++++-
 .../enumerator/assigner/SplitAssignerTestBase.java |  55 ++++--
 21 files changed, 519 insertions(+), 490 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
index 4c33d79d205..bb09315e915 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
@@ -19,7 +19,6 @@
 package org.apache.flink.connector.pulsar.common.schema;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.IOUtils;
 
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -52,8 +51,9 @@ import static 
org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo.encodeKeyV
  * A wrapper for Pulsar {@link Schema}, make it serializable and can be 
created from {@link
  * SchemaInfo}.
  *
- * <p>General pulsar schema info (avro, json, protobuf and keyvalue) don't 
contain the require class
- * info. We have to urge user to provide the related type class and encoded it 
into schema info.
+ * <p>General pulsar schema info (avro, json, protobuf and keyvalue) don't 
contain the required
+ * class info. We have to urge users to provide the related type class and 
encode it into schema
+ * info.
  */
 @Internal
 public final class PulsarSchema<T> implements Serializable {
@@ -164,7 +164,7 @@ public final class PulsarSchema<T> implements Serializable {
         // Schema
         int byteLen = ois.readInt();
         byte[] schemaBytes = new byte[byteLen];
-        IOUtils.readFully(ois, schemaBytes, 0, byteLen);
+        ois.readFully(schemaBytes);
 
         // Type
         int typeIdx = ois.readInt();
@@ -210,8 +210,8 @@ public final class PulsarSchema<T> implements Serializable {
     }
 
     /**
-     * We would throw exception if schema type is protobuf and user don't 
provide protobuf-java in
-     * class path.
+     * We would throw exception if the schema type is protobuf and users don't 
provide protobuf-java
+     * in the class path.
      */
     private void validateSchemaInfo(SchemaInfo info) {
         SchemaType type = info.getType();
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java
index 9f64172a504..93609bc720d 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java
@@ -50,10 +50,7 @@ public final class PulsarSerdeUtils {
     public static byte[] deserializeBytes(DataInputStream in) throws 
IOException {
         int size = in.readInt();
         byte[] bytes = new byte[size];
-        int result = in.read(bytes);
-        if (result < 0) {
-            throw new IOException("Couldn't deserialize the object, wrong byte 
buffer.");
-        }
+        in.readFully(bytes);
 
         return bytes;
     }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
index 4ada27cecdc..84f2315b14a 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
@@ -32,8 +32,6 @@ import 
org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
 import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
-import 
org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
-import 
org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
@@ -145,31 +143,29 @@ public final class PulsarSource<OUT>
     @Override
     public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> 
createEnumerator(
             SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
-        SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, 
sourceConfiguration);
         return new PulsarSourceEnumerator(
                 subscriber,
                 startCursor,
+                stopCursor,
                 rangeGenerator,
                 configuration,
                 sourceConfiguration,
-                enumContext,
-                splitAssigner);
+                enumContext);
     }
 
     @Override
     public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> 
restoreEnumerator(
             SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
             PulsarSourceEnumState checkpoint) {
-        SplitAssigner splitAssigner =
-                SplitAssignerFactory.create(stopCursor, sourceConfiguration, 
checkpoint);
         return new PulsarSourceEnumerator(
                 subscriber,
                 startCursor,
+                stopCursor,
                 rangeGenerator,
                 configuration,
                 sourceConfiguration,
                 enumContext,
-                splitAssigner);
+                checkpoint);
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
index 56bbbd20a32..15c88fe3f0a 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
@@ -20,11 +20,8 @@ package org.apache.flink.connector.pulsar.source.enumerator;
 
 import 
org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -36,63 +33,16 @@ public class PulsarSourceEnumState {
     /** The topic partitions that have been appended to this source. */
     private final Set<TopicPartition> appendedPartitions;
 
-    /**
-     * We convert the topic partition into a split and add to this pending 
list for assigning to a
-     * reader. It is used for Key_Shared, Failover, Exclusive subscription.
-     */
-    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
-
-    /**
-     * It is used for Shared subscription. When a reader is crashed in Shared 
subscription, its
-     * splits would be put in here.
-     */
-    private final Map<Integer, Set<PulsarPartitionSplit>> 
sharedPendingPartitionSplits;
-
-    /**
-     * It is used for Shared subscription. Every {@link PulsarPartitionSplit} 
should be assigned for
-     * all flink readers. Using this map for recording assign status.
-     */
-    private final Map<Integer, Set<String>> readerAssignedSplits;
-
-    /** The pipeline has been triggered and topic partitions have been 
assigned to readers. */
-    private final boolean initialized;
-
-    public PulsarSourceEnumState(
-            Set<TopicPartition> appendedPartitions,
-            Set<PulsarPartitionSplit> pendingPartitionSplits,
-            Map<Integer, Set<PulsarPartitionSplit>> 
pendingSharedPartitionSplits,
-            Map<Integer, Set<String>> readerAssignedSplits,
-            boolean initialized) {
+    public PulsarSourceEnumState(Set<TopicPartition> appendedPartitions) {
         this.appendedPartitions = appendedPartitions;
-        this.pendingPartitionSplits = pendingPartitionSplits;
-        this.sharedPendingPartitionSplits = pendingSharedPartitionSplits;
-        this.readerAssignedSplits = readerAssignedSplits;
-        this.initialized = initialized;
     }
 
     public Set<TopicPartition> getAppendedPartitions() {
         return appendedPartitions;
     }
 
-    public Set<PulsarPartitionSplit> getPendingPartitionSplits() {
-        return pendingPartitionSplits;
-    }
-
-    public Map<Integer, Set<PulsarPartitionSplit>> 
getSharedPendingPartitionSplits() {
-        return sharedPendingPartitionSplits;
-    }
-
-    public Map<Integer, Set<String>> getReaderAssignedSplits() {
-        return readerAssignedSplits;
-    }
-
-    public boolean isInitialized() {
-        return initialized;
-    }
-
     /** The initial assignment state for Pulsar. */
     public static PulsarSourceEnumState initialState() {
-        return new PulsarSourceEnumState(
-                new HashSet<>(), new HashSet<>(), new HashMap<>(), new 
HashMap<>(), false);
+        return new PulsarSourceEnumState(new HashSet<>());
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java
index 7d410b0441f..af5e85c1b3b 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java
@@ -30,18 +30,19 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Map;
 import java.util.Set;
 
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.deserializeMap;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.deserializeSet;
-import static 
org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.serializeMap;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.serializeSet;
 
 /** The {@link SimpleVersionedSerializer Serializer} for the enumerator state 
of Pulsar source. */
 public class PulsarSourceEnumStateSerializer
         implements SimpleVersionedSerializer<PulsarSourceEnumState> {
 
+    // This version should be bumped after modifying the PulsarSourceEnumState.
+    public static final int CURRENT_VERSION = 1;
+
     public static final PulsarSourceEnumStateSerializer INSTANCE =
             new PulsarSourceEnumStateSerializer();
 
@@ -54,33 +55,15 @@ public class PulsarSourceEnumStateSerializer
 
     @Override
     public int getVersion() {
-        // We use PulsarPartitionSplitSerializer's version because we use 
reuse this class.
-        return PulsarPartitionSplitSerializer.CURRENT_VERSION;
+        return CURRENT_VERSION;
     }
 
     @Override
     public byte[] serialize(PulsarSourceEnumState obj) throws IOException {
-        // VERSION 0 serialization
         try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 DataOutputStream out = new DataOutputStream(baos)) {
             serializeSet(
                     out, obj.getAppendedPartitions(), 
SPLIT_SERIALIZER::serializeTopicPartition);
-            serializeSet(
-                    out,
-                    obj.getPendingPartitionSplits(),
-                    SPLIT_SERIALIZER::serializePulsarPartitionSplit);
-            serializeMap(
-                    out,
-                    obj.getSharedPendingPartitionSplits(),
-                    DataOutputStream::writeInt,
-                    (o, v) -> serializeSet(o, v, 
SPLIT_SERIALIZER::serializePulsarPartitionSplit));
-            serializeMap(
-                    out,
-                    obj.getReaderAssignedSplits(),
-                    DataOutputStream::writeInt,
-                    (o, v) -> serializeSet(o, v, DataOutputStream::writeUTF));
-            out.writeBoolean(obj.isInitialized());
-
             out.flush();
             return baos.toByteArray();
         }
@@ -88,23 +71,21 @@ public class PulsarSourceEnumStateSerializer
 
     @Override
     public PulsarSourceEnumState deserialize(int version, byte[] serialized) 
throws IOException {
-        // VERSION 0 deserialization
+        // VERSION 1 deserialization, support VERSION 0 deserialization in the 
meantime.
         try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                 DataInputStream in = new DataInputStream(bais)) {
             Set<TopicPartition> partitions = deserializeSet(in, 
deserializePartition(version));
-            Set<PulsarPartitionSplit> splits = deserializeSet(in, 
deserializeSplit(version));
-            Map<Integer, Set<PulsarPartitionSplit>> sharedSplits =
-                    deserializeMap(
-                            in,
-                            DataInput::readInt,
-                            i -> deserializeSet(i, deserializeSplit(version)));
-            Map<Integer, Set<String>> mapping =
-                    deserializeMap(
-                            in, DataInput::readInt, i -> deserializeSet(i, 
DataInput::readUTF));
-            boolean initialized = in.readBoolean();
-
-            return new PulsarSourceEnumState(
-                    partitions, splits, sharedSplits, mapping, initialized);
+
+            // Only deserialize these fields for backward compatibility.
+            if (version == 0) {
+                deserializeSet(in, deserializeSplit(version));
+                deserializeMap(
+                        in, DataInput::readInt, i -> deserializeSet(i, 
deserializeSplit(version)));
+                deserializeMap(in, DataInput::readInt, i -> deserializeSet(i, 
DataInput::readUTF));
+                in.readBoolean();
+            }
+
+            return new PulsarSourceEnumState(partitions);
         }
     }
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 8d65888283a..8ed7be7cf73 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
 import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
@@ -46,8 +47,10 @@ import java.util.Set;
 import static java.util.Collections.singletonList;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createAdmin;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory.createAssigner;
 
-/** The enumerator class for pulsar source. */
+/** The enumerator class for the pulsar source. */
 @Internal
 public class PulsarSourceEnumerator
         implements SplitEnumerator<PulsarPartitionSplit, 
PulsarSourceEnumState> {
@@ -66,11 +69,31 @@ public class PulsarSourceEnumerator
     public PulsarSourceEnumerator(
             PulsarSubscriber subscriber,
             StartCursor startCursor,
+            StopCursor stopCursor,
+            RangeGenerator rangeGenerator,
+            Configuration configuration,
+            SourceConfiguration sourceConfiguration,
+            SplitEnumeratorContext<PulsarPartitionSplit> context) {
+        this(
+                subscriber,
+                startCursor,
+                stopCursor,
+                rangeGenerator,
+                configuration,
+                sourceConfiguration,
+                context,
+                initialState());
+    }
+
+    public PulsarSourceEnumerator(
+            PulsarSubscriber subscriber,
+            StartCursor startCursor,
+            StopCursor stopCursor,
             RangeGenerator rangeGenerator,
             Configuration configuration,
             SourceConfiguration sourceConfiguration,
             SplitEnumeratorContext<PulsarPartitionSplit> context,
-            SplitAssigner splitAssigner) {
+            PulsarSourceEnumState enumState) {
         this.pulsarAdmin = createAdmin(configuration);
         this.subscriber = subscriber;
         this.startCursor = startCursor;
@@ -78,7 +101,7 @@ public class PulsarSourceEnumerator
         this.configuration = configuration;
         this.sourceConfiguration = sourceConfiguration;
         this.context = context;
-        this.splitAssigner = splitAssigner;
+        this.splitAssigner = createAssigner(stopCursor, sourceConfiguration, 
context, enumState);
     }
 
     @Override
@@ -118,7 +141,12 @@ public class PulsarSourceEnumerator
 
         // If the failed subtask has already restarted, we need to assign 
pending splits to it.
         if (context.registeredReaders().containsKey(subtaskId)) {
-            assignPendingPartitionSplits(singletonList(subtaskId));
+            LOG.debug(
+                    "Reader {} has been restarted after crashing, we will put 
splits back to it.",
+                    subtaskId);
+            // Reassign for all readers in case of adding splits after scale 
up/down.
+            List<Integer> readers = new 
ArrayList<>(context.registeredReaders().keySet());
+            assignPendingPartitionSplits(readers);
         }
     }
 
@@ -159,8 +187,8 @@ public class PulsarSourceEnumerator
     }
 
     /**
-     * Check if there's any partition changes within subscribed topic 
partitions fetched by worker
-     * thread, and convert them to splits the assign them to pulsar readers.
+     * Check if there are any partition changes within subscribed topic 
partitions fetched by worker
+     * thread, and convert them to splits, then assign them to pulsar readers.
      *
      * <p>NOTE: This method should only be invoked in the coordinator executor 
thread.
      *
@@ -234,9 +262,17 @@ public class PulsarSourceEnumerator
                 });
 
         // Assign splits to downstream readers.
-        
splitAssigner.createAssignment(pendingReaders).ifPresent(context::assignSplits);
+        splitAssigner
+                .createAssignment(pendingReaders)
+                .ifPresent(
+                        assignments -> {
+                            LOG.info(
+                                    "The split assignment results are: {}",
+                                    assignments.assignment());
+                            context.assignSplits(assignments);
+                        });
 
-        // If periodically partition discovery is disabled and the 
initializing discovery has done,
+        // If periodically partition discovery is turned off and the 
initializing discovery has done
         // signal NoMoreSplitsEvent to pending readers.
         for (Integer reader : pendingReaders) {
             if (splitAssigner.noMoreSplits(reader)) {
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
index 087e96157d6..1b7b4a6f446 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
@@ -19,8 +19,8 @@
 package org.apache.flink.connector.pulsar.source.enumerator.assigner;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
@@ -29,10 +29,7 @@ import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.pulsar.client.api.SubscriptionType;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -40,27 +37,14 @@ import java.util.Set;
  * and {@link SubscriptionType#Key_Shared} subscriptions.
  */
 @Internal
-public class NonSharedSplitAssigner implements SplitAssigner {
-    private static final long serialVersionUID = 8412586087991597092L;
-
-    private final StopCursor stopCursor;
-    private final boolean enablePartitionDiscovery;
-
-    // These fields would be saved into checkpoint.
-
-    private final Set<TopicPartition> appendedPartitions;
-    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
-    private boolean initialized;
+class NonSharedSplitAssigner extends SplitAssignerBase {
 
     public NonSharedSplitAssigner(
             StopCursor stopCursor,
-            SourceConfiguration sourceConfiguration,
-            PulsarSourceEnumState sourceEnumState) {
-        this.stopCursor = stopCursor;
-        this.enablePartitionDiscovery = 
sourceConfiguration.isEnablePartitionDiscovery();
-        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
-        this.pendingPartitionSplits = 
sourceEnumState.getPendingPartitionSplits();
-        this.initialized = sourceEnumState.isInitialized();
+            boolean enablePartitionDiscovery,
+            SplitEnumeratorContext<PulsarPartitionSplit> context,
+            PulsarSourceEnumState enumState) {
+        super(stopCursor, enablePartitionDiscovery, context, enumState);
     }
 
     @Override
@@ -69,9 +53,13 @@ public class NonSharedSplitAssigner implements SplitAssigner 
{
 
         for (TopicPartition partition : fetchedPartitions) {
             if (!appendedPartitions.contains(partition)) {
-                pendingPartitionSplits.add(new PulsarPartitionSplit(partition, 
stopCursor));
                 appendedPartitions.add(partition);
                 newPartitions.add(partition);
+
+                // Calculate the reader id by the current parallelism.
+                int readerId = partitionOwner(partition);
+                PulsarPartitionSplit split = new 
PulsarPartitionSplit(partition, stopCursor);
+                addSplitToPendingList(readerId, split);
             }
         }
 
@@ -84,43 +72,41 @@ public class NonSharedSplitAssigner implements 
SplitAssigner {
 
     @Override
     public void addSplitsBack(List<PulsarPartitionSplit> splits, int 
subtaskId) {
-        pendingPartitionSplits.addAll(splits);
-    }
-
-    @Override
-    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
-            List<Integer> readers) {
-        if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
-            return Optional.empty();
+        for (PulsarPartitionSplit split : splits) {
+            int readerId = partitionOwner(split.getPartition());
+            addSplitToPendingList(readerId, split);
         }
-
-        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
-
-        List<PulsarPartitionSplit> partitionSplits = new 
ArrayList<>(pendingPartitionSplits);
-        int readerCount = readers.size();
-        for (int i = 0; i < partitionSplits.size(); i++) {
-            int index = i % readerCount;
-            Integer readerId = readers.get(index);
-            PulsarPartitionSplit split = partitionSplits.get(i);
-            assignMap.computeIfAbsent(readerId, id -> new 
ArrayList<>()).add(split);
-        }
-        pendingPartitionSplits.clear();
-
-        return Optional.of(new SplitsAssignment<>(assignMap));
     }
 
-    @Override
-    public boolean noMoreSplits(Integer reader) {
-        return !enablePartitionDiscovery && initialized && 
pendingPartitionSplits.isEmpty();
+    /**
+     * Returns the index of the target subtask that a specific partition 
should be assigned to. It's
+     * inspired by the {@code KafkaSourceEnumerator.getSplitOwner()}
+     *
+     * <p>The resulting distribution of partition has the following contract:
+     *
+     * <ul>
+     *   <li>1. Uniformly distributed across subtasks.
+     *   <li>2. Partitions are round-robin distributed (strictly clockwise 
w.r.t. ascending subtask
+     *       indices) by using the partition id as the offset from a starting 
index (i.e., the index
+     *       of the subtask which partition 0 of the topic will be assigned 
to, determined using the
+     *       topic name).
+     * </ul>
+     *
+     * @param partition The Pulsar partition to assign.
+     * @return The id of the reader that owns this partition.
+     */
+    private int partitionOwner(TopicPartition partition) {
+        return calculatePartitionOwner(
+                partition.getTopic(), partition.getPartitionId(), 
context.currentParallelism());
     }
 
-    @Override
-    public PulsarSourceEnumState snapshotState() {
-        return new PulsarSourceEnumState(
-                appendedPartitions,
-                pendingPartitionSplits,
-                new HashMap<>(),
-                new HashMap<>(),
-                initialized);
+    @VisibleForTesting
+    static int calculatePartitionOwner(String topic, int partitionId, int 
parallelism) {
+        int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
+        /*
+         * Here, the assumption is that the id of Pulsar partitions are always 
ascending starting from
+         * 0. Therefore, can be used directly as the offset clockwise from the 
start index.
+         */
+        return (startIndex + partitionId) % parallelism;
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
index 48d75c8dee3..845bfd2086a 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
@@ -19,8 +19,7 @@
 package org.apache.flink.connector.pulsar.source.enumerator.assigner;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
@@ -29,38 +28,19 @@ import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.pulsar.client.api.SubscriptionType;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
-/** This assigner is used for {@link SubscriptionType#Shared} subscriptions. */
+/** This assigner is used for {@link SubscriptionType#Shared} subscription. */
 @Internal
-public class SharedSplitAssigner implements SplitAssigner {
-    private static final long serialVersionUID = 8468503133499402491L;
-
-    private final StopCursor stopCursor;
-    private final boolean enablePartitionDiscovery;
-
-    // These fields would be saved into checkpoint.
-
-    private final Set<TopicPartition> appendedPartitions;
-    private final Map<Integer, Set<PulsarPartitionSplit>> 
sharedPendingPartitionSplits;
-    private final Map<Integer, Set<String>> readerAssignedSplits;
-    private boolean initialized;
+class SharedSplitAssigner extends SplitAssignerBase {
 
     public SharedSplitAssigner(
             StopCursor stopCursor,
-            SourceConfiguration sourceConfiguration,
-            PulsarSourceEnumState sourceEnumState) {
-        this.stopCursor = stopCursor;
-        this.enablePartitionDiscovery = 
sourceConfiguration.isEnablePartitionDiscovery();
-        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
-        this.sharedPendingPartitionSplits = 
sourceEnumState.getSharedPendingPartitionSplits();
-        this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
-        this.initialized = sourceEnumState.isInitialized();
+            boolean enablePartitionDiscovery,
+            SplitEnumeratorContext<PulsarPartitionSplit> context,
+            PulsarSourceEnumState enumState) {
+        super(stopCursor, enablePartitionDiscovery, context, enumState);
     }
 
     @Override
@@ -68,9 +48,20 @@ public class SharedSplitAssigner implements SplitAssigner {
         List<TopicPartition> newPartitions = new ArrayList<>();
 
         for (TopicPartition partition : fetchedPartitions) {
+            boolean shouldAssign = false;
             if (!appendedPartitions.contains(partition)) {
                 appendedPartitions.add(partition);
                 newPartitions.add(partition);
+                shouldAssign = true;
+            }
+
+            // Reassign the incoming splits when restarting from state.
+            if (shouldAssign || !initialized) {
+                // Share the split to all the readers.
+                for (int i = 0; i < context.currentParallelism(); i++) {
+                    PulsarPartitionSplit split = new 
PulsarPartitionSplit(partition, stopCursor);
+                    addSplitToPendingList(i, split);
+                }
             }
         }
 
@@ -83,66 +74,15 @@ public class SharedSplitAssigner implements SplitAssigner {
 
     @Override
     public void addSplitsBack(List<PulsarPartitionSplit> splits, int 
subtaskId) {
-        Set<PulsarPartitionSplit> pendingPartitionSplits =
-                sharedPendingPartitionSplits.computeIfAbsent(subtaskId, id -> 
new HashSet<>());
-        pendingPartitionSplits.addAll(splits);
-    }
-
-    @Override
-    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
-            List<Integer> readers) {
-        if (readers.isEmpty()) {
-            return Optional.empty();
-        }
-
-        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
-        for (Integer reader : readers) {
-            Set<PulsarPartitionSplit> pendingSplits = 
sharedPendingPartitionSplits.remove(reader);
-            if (pendingSplits == null) {
-                pendingSplits = new HashSet<>();
-            }
-
-            Set<String> assignedSplits =
-                    readerAssignedSplits.computeIfAbsent(reader, r -> new 
HashSet<>());
-
+        if (splits.isEmpty()) {
+            // In case of the task failure. No splits will be put back to the 
enumerator.
             for (TopicPartition partition : appendedPartitions) {
-                String partitionName = partition.toString();
-                if (!assignedSplits.contains(partitionName)) {
-                    pendingSplits.add(new PulsarPartitionSplit(partition, 
stopCursor));
-                    assignedSplits.add(partitionName);
-                }
-            }
-
-            if (!pendingSplits.isEmpty()) {
-                assignMap.put(reader, new ArrayList<>(pendingSplits));
+                addSplitToPendingList(subtaskId, new 
PulsarPartitionSplit(partition, stopCursor));
             }
-        }
-
-        if (assignMap.isEmpty()) {
-            return Optional.empty();
         } else {
-            return Optional.of(new SplitsAssignment<>(assignMap));
+            for (PulsarPartitionSplit split : splits) {
+                addSplitToPendingList(subtaskId, split);
+            }
         }
     }
-
-    @Override
-    public boolean noMoreSplits(Integer reader) {
-        Set<PulsarPartitionSplit> pendingSplits = 
sharedPendingPartitionSplits.get(reader);
-        Set<String> assignedSplits = readerAssignedSplits.get(reader);
-
-        return !enablePartitionDiscovery
-                && initialized
-                && (pendingSplits == null || pendingSplits.isEmpty())
-                && (assignedSplits != null && assignedSplits.size() == 
appendedPartitions.size());
-    }
-
-    @Override
-    public PulsarSourceEnumState snapshotState() {
-        return new PulsarSourceEnumState(
-                appendedPartitions,
-                new HashSet<>(),
-                sharedPendingPartitionSplits,
-                readerAssignedSplits,
-                initialized);
-    }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
index bc03f5103fd..c7343892c7e 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
-import java.io.Serializable;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -34,7 +33,7 @@ import java.util.Set;
  * readers and store all the state into checkpoint.
  */
 @Internal
-public interface SplitAssigner extends Serializable {
+public interface SplitAssigner {
 
     /**
      * Add the current available partitions into assigner.
@@ -54,8 +53,8 @@ public interface SplitAssigner extends Serializable {
     Optional<SplitsAssignment<PulsarPartitionSplit>> 
createAssignment(List<Integer> readers);
 
     /**
-     * It would return true only if periodically partition discovery is 
disabled, the initializing
-     * partition discovery has finished AND there is no pending splits for 
assignment.
+     * It would return true only if periodically partition discovery is turned 
off, the initializing
+     * partition discovery has finished, AND there are no pending splits for 
assignment.
      */
     boolean noMoreSplits(Integer reader);
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
new file mode 100644
index 00000000000..32af433acbb
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Common abstraction for split assigner. */
+abstract class SplitAssignerBase implements SplitAssigner {
+
+    protected final StopCursor stopCursor;
+    protected final boolean enablePartitionDiscovery;
+    protected final SplitEnumeratorContext<PulsarPartitionSplit> context;
+    protected final Set<TopicPartition> appendedPartitions;
+    protected final Map<Integer, Set<PulsarPartitionSplit>> 
pendingPartitionSplits;
+
+    protected boolean initialized;
+
+    protected SplitAssignerBase(
+            StopCursor stopCursor,
+            boolean enablePartitionDiscovery,
+            SplitEnumeratorContext<PulsarPartitionSplit> context,
+            PulsarSourceEnumState enumState) {
+        this.stopCursor = stopCursor;
+        this.enablePartitionDiscovery = enablePartitionDiscovery;
+        this.context = context;
+        this.appendedPartitions = enumState.getAppendedPartitions();
+        this.pendingPartitionSplits = new 
HashMap<>(context.currentParallelism());
+        this.initialized = false;
+    }
+
+    @Override
+    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
+            List<Integer> readers) {
+        if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
+            return Optional.empty();
+        }
+
+        Map<Integer, List<PulsarPartitionSplit>> assignMap =
+                new HashMap<>(pendingPartitionSplits.size());
+
+        for (Integer reader : readers) {
+            Set<PulsarPartitionSplit> splits = 
pendingPartitionSplits.remove(reader);
+            if (splits != null && !splits.isEmpty()) {
+                assignMap.put(reader, new ArrayList<>(splits));
+            }
+        }
+
+        if (assignMap.isEmpty()) {
+            return Optional.empty();
+        } else {
+            return Optional.of(new SplitsAssignment<>(assignMap));
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits(Integer reader) {
+        return !enablePartitionDiscovery
+                && initialized
+                && !pendingPartitionSplits.containsKey(reader);
+    }
+
+    @Override
+    public PulsarSourceEnumState snapshotState() {
+        return new PulsarSourceEnumState(appendedPartitions);
+    }
+
+    /** Add split to pending lists. */
+    protected void addSplitToPendingList(int readerId, PulsarPartitionSplit 
split) {
+        Set<PulsarPartitionSplit> splits =
+                pendingPartitionSplits.computeIfAbsent(readerId, i -> new 
HashSet<>());
+        splits.add(split);
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
index 3e6ebccb49b..02f8934a6bf 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
@@ -19,18 +19,14 @@
 package org.apache.flink.connector.pulsar.source.enumerator.assigner;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
-import static 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
-import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
-import static org.apache.pulsar.client.api.SubscriptionType.Failover;
-import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
-import static org.apache.pulsar.client.api.SubscriptionType.Shared;
-
 /** The factory for creating split assigner. */
 @Internal
 public final class SplitAssignerFactory {
@@ -39,27 +35,26 @@ public final class SplitAssignerFactory {
         // No public constructor.
     }
 
-    /** Create blank assigner. */
-    public static SplitAssigner create(
-            StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
-        return create(stopCursor, sourceConfiguration, initialState());
-    }
-
-    /** Create assigner from checkpoint state. */
-    public static SplitAssigner create(
+    public static SplitAssigner createAssigner(
             StopCursor stopCursor,
             SourceConfiguration sourceConfiguration,
-            PulsarSourceEnumState sourceEnumState) {
+            SplitEnumeratorContext<PulsarPartitionSplit> context,
+            PulsarSourceEnumState enumState) {
         SubscriptionType subscriptionType = 
sourceConfiguration.getSubscriptionType();
-        if (subscriptionType == Exclusive
-                || subscriptionType == Failover
-                || subscriptionType == Key_Shared) {
-            return new NonSharedSplitAssigner(stopCursor, sourceConfiguration, 
sourceEnumState);
-        } else if (subscriptionType == Shared) {
-            return new SharedSplitAssigner(stopCursor, sourceConfiguration, 
sourceEnumState);
-        } else {
-            throw new IllegalArgumentException(
-                    "We don't support this subscription type: " + 
subscriptionType);
+        boolean enablePartitionDiscovery = 
sourceConfiguration.isEnablePartitionDiscovery();
+
+        switch (subscriptionType) {
+            case Failover:
+            case Exclusive:
+            case Key_Shared:
+                return new NonSharedSplitAssigner(
+                        stopCursor, enablePartitionDiscovery, context, 
enumState);
+            case Shared:
+                return new SharedSplitAssigner(
+                        stopCursor, enablePartitionDiscovery, context, 
enumState);
+            default:
+                throw new IllegalArgumentException(
+                        "We don't support this subscription type: " + 
subscriptionType);
         }
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
index c965ff962f8..55b55132625 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
@@ -25,6 +25,8 @@ import org.apache.pulsar.client.api.MessageId;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The class for defining the start or stop position. We only expose the 
constructor for end user.
  */
@@ -39,12 +41,16 @@ public final class CursorPosition implements Serializable {
     private final Long timestamp;
 
     public CursorPosition(MessageId messageId) {
+        checkNotNull(messageId, "Message id couldn't be null.");
+
         this.type = Type.MESSAGE_ID;
         this.messageId = messageId;
         this.timestamp = null;
     }
 
     public CursorPosition(Long timestamp) {
+        checkNotNull(timestamp, "Timestamp couldn't be null.");
+
         this.type = Type.TIMESTAMP;
         this.messageId = null;
         this.timestamp = timestamp;
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
index c086a3fba68..d2662f06b0a 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
@@ -26,11 +26,13 @@ import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 
 import org.apache.pulsar.client.api.Consumer;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 
 import static java.util.stream.Collectors.toCollection;
@@ -55,13 +57,15 @@ public class PulsarUnorderedFetcherManager<T> extends 
PulsarFetcherManagerBase<T
         return fetchers.values().stream()
                 .map(SplitFetcher::getSplitReader)
                 .map(splitReader -> snapshotReader(checkpointId, splitReader))
+                .filter(Optional::isPresent)
+                .map(Optional::get)
                 .collect(toCollection(() -> new ArrayList<>(fetchers.size())));
     }
 
-    private PulsarPartitionSplit snapshotReader(
+    private Optional<PulsarPartitionSplit> snapshotReader(
             long checkpointId, SplitReader<PulsarMessage<T>, 
PulsarPartitionSplit> splitReader) {
         return ((PulsarUnorderedPartitionSplitReader<T>) splitReader)
                 .snapshotState(checkpointId)
-                .toPulsarPartitionSplit();
+                .map(PulsarPartitionSplitState::toPulsarPartitionSplit);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
index 8d18b1d6e8d..cf6de503d74 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
@@ -59,6 +59,8 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
     private final SortedMap<Long, List<TxnID>> transactionsToCommit;
     private final List<TxnID> transactionsOfFinishedSplits;
 
+    private boolean started = false;
+
     public PulsarUnorderedSourceReader(
             
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> 
elementsQueue,
             Supplier<PulsarUnorderedPartitionSplitReader<OUT>> 
splitReaderSupplier,
@@ -82,6 +84,38 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
         this.transactionsOfFinishedSplits = Collections.synchronizedList(new 
ArrayList<>());
     }
 
+    @Override
+    public void start() {
+        this.started = true;
+        super.start();
+    }
+
+    @Override
+    public void addSplits(List<PulsarPartitionSplit> splits) {
+        if (started) {
+            // We only accept splits after this reader is started and 
registered to the pipeline.
+            // This would ignore the splits from the state.
+            super.addSplits(splits);
+        } else {
+            // Abort the pending transaction in this split.
+            for (PulsarPartitionSplit split : splits) {
+                LOG.info("Ignore the split {} saved in checkpoint.", split);
+
+                TxnID transactionId = split.getUncommittedTransactionId();
+                if (transactionId != null && coordinatorClient != null) {
+                    try {
+                        coordinatorClient.abort(transactionId);
+                    } catch (Exception e) {
+                        LOG.debug(
+                                "Error in aborting transaction {} from the 
checkpoint",
+                                transactionId,
+                                e);
+                    }
+                }
+            }
+        }
+    }
+
     @Override
     protected void onSplitFinished(Map<String, PulsarPartitionSplitState> 
finishedSplitIds) {
         // We don't require new splits, all the splits are pre-assigned by 
source enumerator.
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index d6548dc7070..b527e0dde06 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -164,24 +164,21 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
         List<PulsarPartitionSplit> newSplits = splitsChanges.splits();
         Preconditions.checkArgument(
                 newSplits.size() == 1, "This pulsar split reader only support 
one split.");
-        PulsarPartitionSplit newSplit = newSplits.get(0);
+        this.registeredSplit = newSplits.get(0);
 
         // Open stop cursor.
-        newSplit.open(pulsarAdmin);
+        registeredSplit.open(pulsarAdmin);
 
         // Before creating the consumer.
-        beforeCreatingConsumer(newSplit);
+        beforeCreatingConsumer(registeredSplit);
 
         // Create pulsar consumer.
-        Consumer<byte[]> consumer = createPulsarConsumer(newSplit);
+        this.pulsarConsumer = createPulsarConsumer(registeredSplit);
 
         // After creating the consumer.
-        afterCreatingConsumer(newSplit, consumer);
+        afterCreatingConsumer(registeredSplit, pulsarConsumer);
 
-        LOG.info("Register split {} consumer for current reader.", newSplit);
-
-        this.registeredSplit = newSplit;
-        this.pulsarConsumer = consumer;
+        LOG.info("Register split {} consumer for current reader.", 
registeredSplit);
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 630e2567dbc..4f0d444061b 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -149,7 +150,11 @@ public class PulsarUnorderedPartitionSplitReader<OUT> 
extends PulsarPartitionSpl
         }
     }
 
-    public PulsarPartitionSplitState snapshotState(long checkpointId) {
+    public Optional<PulsarPartitionSplitState> snapshotState(long 
checkpointId) {
+        if (registeredSplit == null) {
+            return Optional.empty();
+        }
+
         PulsarPartitionSplitState state = new 
PulsarPartitionSplitState(registeredSplit);
 
         // Avoiding NP problem when Pulsar don't get the message before Flink 
checkpoint.
@@ -159,7 +164,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> 
extends PulsarPartitionSpl
             state.setUncommittedTransactionId(txnID);
         }
 
-        return state;
+        return Optional.of(state);
     }
 
     private Transaction newTransaction() {
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
index 5f18e8f5131..54687a3277e 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
@@ -18,23 +18,17 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator;
 
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
-import java.util.Map;
 import java.util.Set;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer.INSTANCE;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.newMessageId;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
@@ -48,29 +42,13 @@ class PulsarSourceEnumStateSerializerTest {
                 Sets.newHashSet(
                         new TopicPartition(randomAlphabetic(10), 2, new 
TopicRange(1, 30)),
                         new TopicPartition(randomAlphabetic(10), 1, 
createFullRange()));
-        Set<PulsarPartitionSplit> splits =
-                Collections.singleton(
-                        new PulsarPartitionSplit(
-                                new TopicPartition(randomAlphabetic(10), 10, 
createFullRange()),
-                                StopCursor.defaultStopCursor(),
-                                newMessageId(100L, 23L, 44),
-                                null));
-        Map<Integer, Set<PulsarPartitionSplit>> shared = 
Collections.singletonMap(5, splits);
-        Map<Integer, Set<String>> mapping =
-                ImmutableMap.of(
-                        1, Sets.newHashSet(randomAlphabetic(10), 
randomAlphabetic(10)),
-                        2, Sets.newHashSet(randomAlphabetic(10), 
randomAlphabetic(10)));
 
-        PulsarSourceEnumState state =
-                new PulsarSourceEnumState(partitions, splits, shared, mapping, 
true);
+        PulsarSourceEnumState state = new PulsarSourceEnumState(partitions);
 
         byte[] bytes = INSTANCE.serialize(state);
         PulsarSourceEnumState state1 = 
INSTANCE.deserialize(INSTANCE.getVersion(), bytes);
 
         assertEquals(state.getAppendedPartitions(), 
state1.getAppendedPartitions());
-        assertEquals(state.getPendingPartitionSplits(), 
state1.getPendingPartitionSplits());
-        assertEquals(state.getReaderAssignedSplits(), 
state1.getReaderAssignedSplits());
-        assertEquals(state.isInitialized(), state1.isInitialized());
 
         assertNotSame(state, state1);
     }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
index 44bd9ae52f1..325919b00bc 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
@@ -19,29 +19,25 @@
 package org.apache.flink.connector.pulsar.source.enumerator;
 
 import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import 
org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
-import 
org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
-import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
 
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -49,14 +45,16 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
+import static java.util.stream.Collectors.toSet;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.latest;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
 import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -66,6 +64,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     private static final int NUM_SUBTASKS = 3;
     private static final int READER0 = 0;
     private static final int READER1 = 1;
+    private static final int READER2 = 2;
     private static final int PARTITION_DISCOVERY_CALLABLE_INDEX = 0;
     private static final boolean ENABLE_PERIODIC_PARTITION_DISCOVERY = true;
     private static final boolean DISABLE_PERIODIC_PARTITION_DISCOVERY = false;
@@ -75,13 +74,13 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
             value = SubscriptionType.class,
             names = {"Failover", "Shared"})
     void startWithDiscoverPartitionsOnce(SubscriptionType subscriptionType) 
throws Exception {
-        Set<String> prexistingTopics = setupPreexistingTopics();
+        Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
                                 subscriptionType,
-                                prexistingTopics,
+                                preexistingTopics,
                                 context,
                                 DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
@@ -100,13 +99,13 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
             value = SubscriptionType.class,
             names = {"Failover", "Shared"})
     void startWithPeriodicPartitionDiscovery(SubscriptionType 
subscriptionType) throws Exception {
-        Set<String> prexistingTopics = setupPreexistingTopics();
+        Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
                                 subscriptionType,
-                                prexistingTopics,
+                                preexistingTopics,
                                 context,
                                 ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
@@ -123,26 +122,27 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
             value = SubscriptionType.class,
             names = {"Failover", "Shared"})
     void discoverPartitionsTriggersAssignments(SubscriptionType 
subscriptionType) throws Throwable {
-        Set<String> prexistingTopics = setupPreexistingTopics();
+        Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
                                 subscriptionType,
-                                prexistingTopics,
+                                preexistingTopics,
                                 context,
                                 DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
             enumerator.start();
 
-            // register reader 0, 1
+            // register reader 0, 1, 2
             registerReader(context, enumerator, READER0);
             registerReader(context, enumerator, READER1);
+            registerReader(context, enumerator, READER2);
             assertThat(context.getSplitsAssignmentSequence()).isEmpty();
 
             // Run the partition discover callable and check the partition 
assignment.
             runOneTimePartitionDiscovery(context);
-            verifyLastReadersAssignments(subscriptionType, context, 
prexistingTopics, 1);
+            verifyAllReaderAssignments(subscriptionType, context, 
preexistingTopics, 1);
         }
     }
 
@@ -151,9 +151,9 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
             value = SubscriptionType.class,
             names = {"Failover", "Shared"})
     void discoverPartitionsPeriodically(SubscriptionType subscriptionType) 
throws Throwable {
-        String dynamicTopic = randomAlphabetic(10);
-        Set<String> prexistingTopics = setupPreexistingTopics();
-        Set<String> topicsToSubscribe = new HashSet<>(prexistingTopics);
+        String dynamicTopic = "topic3-" + randomAlphabetic(10);
+        Set<String> preexistingTopics = setupPreexistingTopics();
+        Set<String> topicsToSubscribe = new HashSet<>(preexistingTopics);
         topicsToSubscribe.add(dynamicTopic);
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
@@ -165,35 +165,28 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
                                 ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
             testRegisterReadersForPreexistingTopics(
-                    subscriptionType, prexistingTopics, context, enumerator);
+                    subscriptionType, preexistingTopics, context, enumerator);
 
             // invoke partition discovery callable again and there should be 
no new assignments.
             runPeriodicPartitionDiscovery(context);
 
-            int expectedSplitsAssignmentSequenceSize =
-                    subscriptionType == SubscriptionType.Failover ? 1 : 2;
-
             assertThat(context.getSplitsAssignmentSequence())
                     .as("No new assignments should be made because there is no 
partition change")
-                    .hasSize(expectedSplitsAssignmentSequenceSize);
+                    .hasSize(3);
 
-            // create the dynamic topic.
-            operator().createTopic(dynamicTopic, 
PulsarRuntimeOperator.DEFAULT_PARTITIONS);
+            // Create the dynamic topic.
+            operator().createTopic(dynamicTopic, DEFAULT_PARTITIONS);
 
-            // invoke partition discovery callable again.
+            // Invoke partition discovery callable again.
             while (true) {
                 runPeriodicPartitionDiscovery(context);
-                if (context.getSplitsAssignmentSequence().size() < 2) {
+                if (context.getSplitsAssignmentSequence().size() < 4) {
                     sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
                 } else {
                     break;
                 }
             }
-            verifyLastReadersAssignments(
-                    subscriptionType,
-                    context,
-                    Collections.singleton(dynamicTopic),
-                    expectedSplitsAssignmentSequenceSize + 1);
+            verifyAllReaderAssignments(subscriptionType, context, 
topicsToSubscribe, 4);
         }
     }
 
@@ -202,57 +195,51 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
             value = SubscriptionType.class,
             names = {"Failover", "Shared"})
     void addSplitsBack(SubscriptionType subscriptionType) throws Throwable {
-        Set<String> prexistingTopics = setupPreexistingTopics();
+        Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
                                 subscriptionType,
-                                prexistingTopics,
+                                preexistingTopics,
                                 context,
                                 ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
             testRegisterReadersForPreexistingTopics(
-                    subscriptionType, prexistingTopics, context, enumerator);
+                    subscriptionType, preexistingTopics, context, enumerator);
 
             // Simulate a reader failure.
             context.unregisterReader(READER0);
             enumerator.addSplitsBack(
                     
context.getSplitsAssignmentSequence().get(0).assignment().get(READER0),
                     READER0);
-            int expectedSplitsAssignmentSequenceSize =
-                    subscriptionType == SubscriptionType.Failover ? 1 : 2;
             assertThat(context.getSplitsAssignmentSequence())
                     .as("The added back splits should have not been assigned")
-                    .hasSize(expectedSplitsAssignmentSequenceSize);
+                    .hasSize(3);
 
             // Simulate a reader recovery.
             registerReader(context, enumerator, READER0);
-            verifyLastReadersAssignments(
-                    subscriptionType,
-                    context,
-                    prexistingTopics,
-                    expectedSplitsAssignmentSequenceSize + 1);
+            verifyAllReaderAssignments(subscriptionType, context, 
preexistingTopics, 3 + 1);
         }
     }
 
     @ParameterizedTest
     @EnumSource(
             value = SubscriptionType.class,
-            names = {"Failover"})
+            names = {"Failover", "Shared"})
     void workWithPreexistingAssignments(SubscriptionType subscriptionType) 
throws Throwable {
-        Set<String> prexistingTopics = setupPreexistingTopics();
+        Set<String> preexistingTopics = setupPreexistingTopics();
         PulsarSourceEnumState preexistingAssignments;
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context1 =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
                                 subscriptionType,
-                                prexistingTopics,
+                                preexistingTopics,
                                 context1,
                                 ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
             testRegisterReadersForPreexistingTopics(
-                    subscriptionType, prexistingTopics, context1, enumerator);
+                    subscriptionType, preexistingTopics, context1, enumerator);
             preexistingAssignments =
                     
asEnumState(context1.getSplitsAssignmentSequence().get(0).assignment());
         }
@@ -262,7 +249,7 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
                                 subscriptionType,
-                                prexistingTopics,
+                                preexistingTopics,
                                 context2,
                                 ENABLE_PERIODIC_PARTITION_DISCOVERY,
                                 preexistingAssignments)) {
@@ -270,7 +257,11 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
             runPeriodicPartitionDiscovery(context2);
 
             registerReader(context2, enumerator, READER0);
-            verifyLastReadersAssignments(subscriptionType, context2, 
prexistingTopics, 1);
+            if (subscriptionType == SubscriptionType.Shared) {
+                verifyAllReaderAssignments(subscriptionType, context2, 
preexistingTopics, 1);
+            } else {
+                assertThat(context2.getSplitsAssignmentSequence()).isEmpty();
+            }
         }
     }
 
@@ -279,11 +270,11 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
             value = SubscriptionType.class,
             names = {"Failover", "Shared"})
     void snapshotState(SubscriptionType subscriptionType) throws Throwable {
-        Set<String> prexistingTopics = setupPreexistingTopics();
+        Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
-                        createEnumerator(subscriptionType, prexistingTopics, 
context, false)) {
+                        createEnumerator(subscriptionType, preexistingTopics, 
context, false)) {
             enumerator.start();
 
             // No reader is registered, so the state should be empty
@@ -297,19 +288,18 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
             // The state should contain splits assigned to READER0 and READER1
             final PulsarSourceEnumState state2 = enumerator.snapshotState(1L);
             verifySplitAssignmentWithPartitions(
-                    getExpectedTopicPartitions(prexistingTopics), 
state2.getAppendedPartitions());
+                    getExpectedTopicPartitions(preexistingTopics), 
state2.getAppendedPartitions());
         }
     }
 
     private Set<String> setupPreexistingTopics() {
-        String topic1 = randomAlphabetic(10);
-        String topic2 = randomAlphabetic(10);
+        String topic1 = "topic1-" + randomAlphabetic(10);
+        String topic2 = "topic2-" + randomAlphabetic(10);
+
         operator().setupTopic(topic1);
         operator().setupTopic(topic2);
-        Set<String> preexistingTopics = new HashSet<>();
-        preexistingTopics.add(topic1);
-        preexistingTopics.add(topic2);
-        return preexistingTopics;
+
+        return ImmutableSet.of(topic1, topic2);
     }
 
     private void testRegisterReadersForPreexistingTopics(
@@ -326,14 +316,14 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
 
         // Run the partition discover callable and check the partition 
assignment.
         runPeriodicPartitionDiscovery(context);
-        verifyLastReadersAssignments(subscriptionType, context, topics, 1);
+        if (subscriptionType == SubscriptionType.Shared) {
+            verifyAllReaderAssignments(subscriptionType, context, topics, 1);
+        }
 
         registerReader(context, enumerator, READER1);
+        registerReader(context, enumerator, READER2);
 
-        int expectedSplitsAssignmentSequenceSize =
-                subscriptionType == SubscriptionType.Failover ? 1 : 2;
-        verifyLastReadersAssignments(
-                subscriptionType, context, topics, 
expectedSplitsAssignmentSequenceSize);
+        verifyAllReaderAssignments(subscriptionType, context, topics, 3);
     }
 
     private PulsarSourceEnumerator createEnumerator(
@@ -341,19 +331,12 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
             Set<String> topics,
             MockSplitEnumeratorContext<PulsarPartitionSplit> enumContext,
             boolean enablePeriodicPartitionDiscovery) {
-        PulsarSourceEnumState sourceEnumState =
-                new PulsarSourceEnumState(
-                        Sets.newHashSet(),
-                        Sets.newHashSet(),
-                        Maps.newHashMap(),
-                        Maps.newHashMap(),
-                        false);
         return createEnumerator(
                 subscriptionType,
                 topics,
                 enumContext,
                 enablePeriodicPartitionDiscovery,
-                sourceEnumState);
+                initialState());
     }
 
     private PulsarSourceEnumerator createEnumerator(
@@ -377,51 +360,51 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
         } else {
             configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L);
         }
-        SourceConfiguration sourceConfiguration = new 
SourceConfiguration(configuration);
 
-        SplitAssigner assigner =
-                SplitAssignerFactory.create(latest(), sourceConfiguration, 
sourceEnumState);
         return new PulsarSourceEnumerator(
                 subscriber,
                 StartCursor.earliest(),
+                StopCursor.latest(),
                 new FullRangeGenerator(),
                 configuration,
-                sourceConfiguration,
+                new SourceConfiguration(configuration),
                 enumContext,
-                assigner);
+                sourceEnumState);
     }
 
     private void registerReader(
             MockSplitEnumeratorContext<PulsarPartitionSplit> context,
             PulsarSourceEnumerator enumerator,
             int reader) {
-        context.registerReader(new ReaderInfo(reader, "testing location "));
+        context.registerReader(new ReaderInfo(reader, "testing location"));
         enumerator.addReader(reader);
     }
 
-    private void verifyLastReadersAssignments(
+    private void verifyAllReaderAssignments(
             SubscriptionType subscriptionType,
             MockSplitEnumeratorContext<PulsarPartitionSplit> context,
             Set<String> topics,
             int expectedAssignmentSeqSize) {
         
assertThat(context.getSplitsAssignmentSequence()).hasSize(expectedAssignmentSeqSize);
-        verifyAssignments(
-                subscriptionType,
-                getExpectedTopicPartitions(topics),
-                context.getSplitsAssignmentSequence()
-                        .get(expectedAssignmentSeqSize - 1)
-                        .assignment());
-    }
+        // Merge the assignments into one
+        List<SplitsAssignment<PulsarPartitionSplit>> sequence =
+                context.getSplitsAssignmentSequence();
+        Map<Integer, Set<PulsarPartitionSplit>> assignments = new HashMap<>();
+
+        for (int i = 0; i < expectedAssignmentSeqSize; i++) {
+            Map<Integer, List<PulsarPartitionSplit>> assignment = 
sequence.get(i).assignment();
+            assignment.forEach(
+                    (key, value) ->
+                            assignments.computeIfAbsent(key, k -> new 
HashSet<>()).addAll(value));
+        }
 
-    private void verifyAssignments(
-            SubscriptionType subscriptionType,
-            Set<TopicPartition> expectedTopicPartitions,
-            Map<Integer, List<PulsarPartitionSplit>> actualAssignments) {
+        // Compare assigned partitions with desired partitions.
+        Set<TopicPartition> expectedTopicPartitions = 
getExpectedTopicPartitions(topics);
         if (subscriptionType == SubscriptionType.Failover) {
-            int actualSize = 
actualAssignments.values().stream().mapToInt(List::size).sum();
+            int actualSize = 
assignments.values().stream().mapToInt(Set::size).sum();
             assertThat(actualSize).isEqualTo(expectedTopicPartitions.size());
         } else if (subscriptionType == SubscriptionType.Shared) {
-            actualAssignments
+            assignments
                     .values()
                     .forEach(
                             (splits) -> 
assertThat(splits).hasSize(expectedTopicPartitions.size()));
@@ -431,8 +414,8 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
     private Set<TopicPartition> getExpectedTopicPartitions(Set<String> topics) 
{
         Set<TopicPartition> allPartitions = new HashSet<>();
         for (String topicName : topics) {
-            for (int i = 0; i < PulsarRuntimeOperator.DEFAULT_PARTITIONS; i++) 
{
-                allPartitions.add(new TopicPartition(topicName, i, 
TopicRange.createFullRange()));
+            for (int i = 0; i < DEFAULT_PARTITIONS; i++) {
+                allPartitions.add(new TopicPartition(topicName, i, 
createFullRange()));
             }
         }
         return allPartitions;
@@ -443,32 +426,16 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
         assertThat(actualTopicPartitions).isEqualTo(expectedAssignment);
     }
 
-    // this method only works for non Shared Mode
+    // this method only works for non-Shared Mode
     private PulsarSourceEnumState asEnumState(
             Map<Integer, List<PulsarPartitionSplit>> assignments) {
-        Set<TopicPartition> appendedPartitions = new HashSet<>();
-        Set<PulsarPartitionSplit> pendingPartitionSplits = new HashSet<>();
-        Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits = 
new HashMap<>();
-        Map<Integer, Set<String>> readerAssignedSplits = new HashMap<>();
-        boolean initialized = false;
-
-        assignments
-                .values()
-                .forEach(
-                        splits -> {
-                            appendedPartitions.addAll(
-                                    splits.stream()
-                                            
.map(PulsarPartitionSplit::getPartition)
-                                            .collect(Collectors.toList()));
-                            pendingPartitionSplits.addAll(splits);
-                        });
-
-        return new PulsarSourceEnumState(
-                appendedPartitions,
-                pendingPartitionSplits,
-                sharedPendingPartitionSplits,
-                readerAssignedSplits,
-                initialized);
+        Set<TopicPartition> appendedPartitions =
+                assignments.values().stream()
+                        .flatMap(List::stream)
+                        .map(PulsarPartitionSplit::getPartition)
+                        .collect(toSet());
+
+        return new PulsarSourceEnumState(appendedPartitions);
     }
 
     private void runOneTimePartitionDiscovery(
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
index 2e9ada3b741..58f8d8fc51c 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
@@ -18,47 +18,56 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.assigner;
 
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 
 import static java.util.Collections.singletonList;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.assigner.NonSharedSplitAssigner.calculatePartitionOwner;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Unit tests for {@link NonSharedSplitAssigner}. */
-class NonSharedSplitAssignerTest extends 
SplitAssignerTestBase<NonSharedSplitAssigner> {
+class NonSharedSplitAssignerTest extends SplitAssignerTestBase {
 
     @Test
     void noMoreSplits() {
-        NonSharedSplitAssigner assigner = splitAssigner(true);
+        SplitAssigner assigner = splitAssigner(true, 4);
         assertFalse(assigner.noMoreSplits(3));
 
-        assigner = splitAssigner(false);
+        assigner = splitAssigner(false, 4);
         assertFalse(assigner.noMoreSplits(3));
 
-        assigner.registerTopicPartitions(createPartitions("f", 8));
-        assertFalse(assigner.noMoreSplits(3));
+        Set<TopicPartition> partitions = 
createPartitions("persistent://public/default/f", 8);
+        int owner = calculatePartitionOwner("persistent://public/default/f", 
8, 4);
+
+        assigner.registerTopicPartitions(partitions);
+        assertFalse(assigner.noMoreSplits(owner));
 
-        assigner.createAssignment(singletonList(1));
-        assertTrue(assigner.noMoreSplits(1));
-        assertTrue(assigner.noMoreSplits(3));
+        assigner.createAssignment(singletonList(owner));
+        assertTrue(assigner.noMoreSplits(owner));
     }
 
     @Test
     void partitionsAssignment() {
-        NonSharedSplitAssigner assigner = splitAssigner(true);
-        assigner.registerTopicPartitions(createPartitions("d", 4));
-        List<Integer> readers = Arrays.asList(1, 3, 5, 7);
+        SplitAssigner assigner = splitAssigner(true, 4);
+        
assigner.registerTopicPartitions(createPartitions("persistent://public/default/d",
 4));
+        int owner = calculatePartitionOwner("persistent://public/default/d", 
4, 4);
+        List<Integer> readers = Arrays.asList(owner, owner + 1);
 
         // Assignment with initial states.
         Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
@@ -66,30 +75,39 @@ class NonSharedSplitAssignerTest extends 
SplitAssignerTestBase<NonSharedSplitAss
         assertThat(assignment).isPresent();
         assertThat(assignment.get().assignment()).hasSize(1);
 
-        // Reassignment with same readers.
+        // Reassignment with the same readers.
         assignment = assigner.createAssignment(readers);
         assertThat(assignment).isNotPresent();
 
         // Register new partition and assign.
-        assigner.registerTopicPartitions(createPartitions("e", 5));
-        assigner.registerTopicPartitions(createPartitions("f", 1));
-        assigner.registerTopicPartitions(createPartitions("g", 3));
-        assigner.registerTopicPartitions(createPartitions("h", 4));
+        
assigner.registerTopicPartitions(createPartitions("persistent://public/default/e",
 5));
+        
assigner.registerTopicPartitions(createPartitions("persistent://public/default/f",
 1));
+        
assigner.registerTopicPartitions(createPartitions("persistent://public/default/g",
 3));
+        
assigner.registerTopicPartitions(createPartitions("persistent://public/default/h",
 4));
+
+        Set<Integer> owners = new HashSet<>();
+        owners.add(calculatePartitionOwner("persistent://public/default/e", 5, 
4));
+        owners.add(calculatePartitionOwner("persistent://public/default/f", 1, 
4));
+        owners.add(calculatePartitionOwner("persistent://public/default/g", 3, 
4));
+        owners.add(calculatePartitionOwner("persistent://public/default/h", 4, 
4));
+        readers = new ArrayList<>(owners);
+
         assignment = assigner.createAssignment(readers);
         assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(4);
+        assertThat(assignment.get().assignment()).hasSize(readers.size());
 
         // Assign to new readers.
-        readers = Arrays.asList(2, 4, 6, 8);
+        readers = Collections.singletonList(5);
         assignment = assigner.createAssignment(readers);
         assertThat(assignment).isNotPresent();
     }
 
     @Override
-    protected NonSharedSplitAssigner createAssigner(
+    protected SplitAssigner createAssigner(
             StopCursor stopCursor,
-            SourceConfiguration sourceConfiguration,
-            PulsarSourceEnumState sourceEnumState) {
-        return new NonSharedSplitAssigner(stopCursor, sourceConfiguration, 
sourceEnumState);
+            boolean enablePartitionDiscovery,
+            SplitEnumeratorContext<PulsarPartitionSplit> context,
+            PulsarSourceEnumState enumState) {
+        return new NonSharedSplitAssigner(stopCursor, 
enablePartitionDiscovery, context, enumState);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
index 91584b87688..68f73b595ae 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.assigner;
 
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
 import org.junit.jupiter.api.Test;
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 
 import static java.util.Collections.singletonList;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -36,14 +38,14 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Unit tests for {@link SharedSplitAssigner}. */
-class SharedSplitAssignerTest extends 
SplitAssignerTestBase<SharedSplitAssigner> {
+class SharedSplitAssignerTest extends SplitAssignerTestBase {
 
     @Test
     void noMoreSplits() {
-        SharedSplitAssigner assigner = splitAssigner(true);
+        SplitAssigner assigner = splitAssigner(true, 4);
         assertFalse(assigner.noMoreSplits(3));
 
-        assigner = splitAssigner(false);
+        assigner = splitAssigner(false, 4);
         assertFalse(assigner.noMoreSplits(3));
 
         assigner.registerTopicPartitions(createPartitions("f", 8));
@@ -59,7 +61,7 @@ class SharedSplitAssignerTest extends 
SplitAssignerTestBase<SharedSplitAssigner>
 
     @Test
     void partitionsAssignment() {
-        SharedSplitAssigner assigner = splitAssigner(true);
+        SplitAssigner assigner = splitAssigner(true, 8);
         assigner.registerTopicPartitions(createPartitions("d", 4));
         List<Integer> readers = Arrays.asList(1, 3, 5, 7);
 
@@ -80,7 +82,7 @@ class SharedSplitAssignerTest extends 
SplitAssignerTestBase<SharedSplitAssigner>
         assertThat(assignment.get().assignment()).hasSize(4);
 
         // Assign to new readers.
-        readers = Arrays.asList(2, 4, 6, 8);
+        readers = Arrays.asList(0, 2, 4, 6);
         assignment = assigner.createAssignment(readers);
         assertThat(assignment).isPresent();
         assertThat(assignment.get().assignment())
@@ -88,11 +90,32 @@ class SharedSplitAssignerTest extends 
SplitAssignerTestBase<SharedSplitAssigner>
                 .allSatisfy((k, v) -> assertThat(v).hasSize(2));
     }
 
+    @Test
+    void reassignSplitsAfterRestarting() {
+        SplitAssigner assigner = splitAssigner(true, 8);
+        Set<TopicPartition> partitions = createPartitions("d", 4);
+        assigner.registerTopicPartitions(partitions);
+        List<Integer> readers = Arrays.asList(0, 1, 2);
+
+        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+                assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(3);
+
+        // Create a new split assigner with same state.
+        SplitAssigner assigner1 = splitAssigner(true, 8, partitions);
+        assigner1.registerTopicPartitions(partitions);
+        assignment = assigner1.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(3);
+    }
+
     @Override
-    protected SharedSplitAssigner createAssigner(
+    protected SplitAssigner createAssigner(
             StopCursor stopCursor,
-            SourceConfiguration sourceConfiguration,
-            PulsarSourceEnumState sourceEnumState) {
-        return new SharedSplitAssigner(stopCursor, sourceConfiguration, 
sourceEnumState);
+            boolean enablePartitionDiscovery,
+            SplitEnumeratorContext<PulsarPartitionSplit> context,
+            PulsarSourceEnumState enumState) {
+        return new SharedSplitAssigner(stopCursor, enablePartitionDiscovery, 
context, enumState);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
index 65094014720..e9b7be52def 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
@@ -18,17 +18,19 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.assigner;
 
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -36,18 +38,20 @@ import java.util.Set;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
-import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.defaultStopCursor;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test utils for split assigners. */
-abstract class SplitAssignerTestBase<T extends SplitAssigner> extends 
TestLogger {
+abstract class SplitAssignerTestBase extends TestLogger {
+
+    private static final 
List<MockSplitEnumeratorContext<PulsarPartitionSplit>> enumeratorContexts =
+            new ArrayList<>();
 
     @Test
     void registerTopicPartitionsWillOnlyReturnNewPartitions() {
-        T assigner = splitAssigner(true);
+        SplitAssigner assigner = splitAssigner(true, 4);
 
         Set<TopicPartition> partitions = 
createPartitions("persistent://public/default/a", 1);
         List<TopicPartition> newPartitions = 
assigner.registerTopicPartitions(partitions);
@@ -72,7 +76,7 @@ abstract class SplitAssignerTestBase<T extends SplitAssigner> 
extends TestLogger
 
     @Test
     void noReadersProvideForAssignment() {
-        T assigner = splitAssigner(false);
+        SplitAssigner assigner = splitAssigner(false, 4);
         assigner.registerTopicPartitions(createPartitions("c", 5));
 
         Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
@@ -82,7 +86,7 @@ abstract class SplitAssignerTestBase<T extends SplitAssigner> 
extends TestLogger
 
     @Test
     void noPartitionsProvideForAssignment() {
-        T assigner = splitAssigner(true);
+        SplitAssigner assigner = splitAssigner(true, 4);
         Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
                 assigner.createAssignment(singletonList(4));
         assertThat(assignment).isNotPresent();
@@ -93,21 +97,32 @@ abstract class SplitAssignerTestBase<T extends 
SplitAssigner> extends TestLogger
         return singleton(p1);
     }
 
-    protected T splitAssigner(boolean discovery) {
-        Configuration configuration = new Configuration();
-
-        if (discovery) {
-            configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 1000L);
-        } else {
-            configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L);
-        }
+    protected SplitAssigner splitAssigner(boolean discovery, int parallelism) {
+        MockSplitEnumeratorContext<PulsarPartitionSplit> context =
+                new MockSplitEnumeratorContext<>(parallelism);
+        enumeratorContexts.add(context);
+        return createAssigner(defaultStopCursor(), discovery, context, 
initialState());
+    }
 
-        SourceConfiguration sourceConfiguration = new 
SourceConfiguration(configuration);
-        return createAssigner(defaultStopCursor(), sourceConfiguration, 
initialState());
+    protected SplitAssigner splitAssigner(
+            boolean discovery, int parallelism, Set<TopicPartition> 
partitions) {
+        MockSplitEnumeratorContext<PulsarPartitionSplit> context =
+                new MockSplitEnumeratorContext<>(parallelism);
+        enumeratorContexts.add(context);
+        return createAssigner(
+                defaultStopCursor(), discovery, context, new 
PulsarSourceEnumState(partitions));
     }
 
-    protected abstract T createAssigner(
+    protected abstract SplitAssigner createAssigner(
             StopCursor stopCursor,
-            SourceConfiguration sourceConfiguration,
-            PulsarSourceEnumState sourceEnumState);
+            boolean enablePartitionDiscovery,
+            SplitEnumeratorContext<PulsarPartitionSplit> context,
+            PulsarSourceEnumState enumState);
+
+    @AfterAll
+    static void afterAll() throws Exception {
+        for (MockSplitEnumeratorContext<PulsarPartitionSplit> context : 
enumeratorContexts) {
+            context.close();
+        }
+    }
 }

Reply via email to