This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b0a8827ffc [FLINK-38564][connector] FLIP-537: Enumerator maintains 
global splits distribution for reassignment
7b0a8827ffc is described below

commit 7b0a8827ffc111fcdd714682fb1c66be9d5d981d
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Oct 30 20:55:12 2025 +0800

    [FLINK-38564][connector] FLIP-537: Enumerator maintains global splits 
distribution for reassignment
    
    [FLINK-38564][connector] FLIP-537: Enumerator maintains global splits 
distribution for reassignment.
    
    This closes #27149.
---
 .../flink/api/connector/source/ReaderInfo.java     |  44 ++++++--
 .../SupportsSplitReassignmentOnRecovery.java       |  29 +++++
 .../source/mocks/MockSplitEnumerator.java          |  87 +++++++++------
 .../source/coordinator/SourceCoordinator.java      |  11 +-
 .../coordinator/SourceCoordinatorContext.java      |   6 +-
 .../source/event/ReaderRegistrationEvent.java      |  47 +++++++-
 .../streaming/api/operators/SourceOperator.java    |  19 +++-
 .../api/operators/SourceOperatorFactory.java       |  10 +-
 .../coordinator/SourceCoordinatorContextTest.java  |  15 +++
 .../source/coordinator/SourceCoordinatorTest.java  | 121 +++++++++++++++++++++
 .../coordinator/SourceCoordinatorTestBase.java     |  17 +++
 .../SourceOperatorSplitWatermarkAlignmentTest.java |   6 +-
 .../api/operators/SourceOperatorTest.java          |  66 +++++++----
 .../api/operators/SourceOperatorTestContext.java   |   8 +-
 .../operators/SourceOperatorWatermarksTest.java    |   3 +-
 .../operators/source/TestingSourceOperator.java    |  24 +++-
 16 files changed, 424 insertions(+), 89 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
index 7a93c1c7549..27e6f871f4e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
@@ -21,9 +21,16 @@ package org.apache.flink.api.connector.source;
 import org.apache.flink.annotation.Public;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 
-/** A container class hosting the information of a {@link SourceReader}. */
+/**
+ * A container class hosting the information of a {@link SourceReader}.
+ *
+ * <p>The {@code reportedSplitsOnRegistration} can only be provided when the 
source implements
+ * {@link SupportsSplitReassignmentOnRecovery}.
+ */
 @Public
 public final class ReaderInfo implements Serializable {
 
@@ -31,10 +38,27 @@ public final class ReaderInfo implements Serializable {
 
     private final int subtaskId;
     private final String location;
+    private final List<SourceSplit> reportedSplitsOnRegistration;
 
     public ReaderInfo(int subtaskId, String location) {
+        this(subtaskId, location, Collections.emptyList());
+    }
+
+    ReaderInfo(int subtaskId, String location, List<SourceSplit> splits) {
         this.subtaskId = subtaskId;
         this.location = location;
+        this.reportedSplitsOnRegistration = splits;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <SplitT extends SourceSplit> ReaderInfo createReaderInfo(
+            int subtaskId, String location, List<SplitT> splits) {
+        return new ReaderInfo(subtaskId, location, (List<SourceSplit>) splits);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <SplitT extends SourceSplit> List<SplitT> 
getReportedSplitsOnRegistration() {
+        return (List<SplitT>) reportedSplitsOnRegistration;
     }
 
     /**
@@ -52,16 +76,18 @@ public final class ReaderInfo implements Serializable {
     }
 
     @Override
-    public int hashCode() {
-        return Objects.hash(subtaskId, location);
+    public boolean equals(Object o) {
+        if (!(o instanceof ReaderInfo)) {
+            return false;
+        }
+        ReaderInfo that = (ReaderInfo) o;
+        return subtaskId == that.subtaskId
+                && Objects.equals(location, that.location)
+                && Objects.equals(reportedSplitsOnRegistration, 
that.reportedSplitsOnRegistration);
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof ReaderInfo)) {
-            return false;
-        }
-        ReaderInfo other = (ReaderInfo) obj;
-        return subtaskId == other.subtaskId && location.equals(other.location);
+    public int hashCode() {
+        return Objects.hash(subtaskId, location, reportedSplitsOnRegistration);
     }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java
new file mode 100644
index 00000000000..225eb32dc66
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A decorative interface for {@link Source}. Implementing this interface 
indicates that the source
+ * operator needs to report splits to the enumerator on start up and receive 
reassignment on
+ * recovery.
+ */
+@PublicEvolving
+public interface SupportsSplitReassignmentOnRecovery {}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
index 7a3763bc5f9..2adc79360f3 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.connector.source.mocks;
 
+import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -28,20 +29,20 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 /** A mock {@link SplitEnumerator} for unit tests. */
 public class MockSplitEnumerator
         implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>, 
SupportsBatchSnapshot {
-    private final SortedSet<MockSourceSplit> unassignedSplits;
+    private final Map<Integer, Set<MockSourceSplit>> pendingSplitAssignment;
+    private final Map<String, Integer> globalSplitAssignment;
     private final SplitEnumeratorContext<MockSourceSplit> enumContext;
     private final List<SourceEvent> handledSourceEvent;
     private final List<Long> successfulCheckpoints;
@@ -50,22 +51,24 @@ public class MockSplitEnumerator
 
     public MockSplitEnumerator(int numSplits, 
SplitEnumeratorContext<MockSourceSplit> enumContext) {
         this(new HashSet<>(), enumContext);
+        List<MockSourceSplit> unassignedSplits = new ArrayList<>();
         for (int i = 0; i < numSplits; i++) {
             unassignedSplits.add(new MockSourceSplit(i));
         }
+        recalculateAssignments(unassignedSplits);
     }
 
     public MockSplitEnumerator(
             Set<MockSourceSplit> unassignedSplits,
             SplitEnumeratorContext<MockSourceSplit> enumContext) {
-        this.unassignedSplits =
-                new TreeSet<>(Comparator.comparingInt(o -> 
Integer.parseInt(o.splitId())));
-        this.unassignedSplits.addAll(unassignedSplits);
+        this.pendingSplitAssignment = new HashMap<>();
+        this.globalSplitAssignment = new HashMap<>();
         this.enumContext = enumContext;
         this.handledSourceEvent = new ArrayList<>();
         this.successfulCheckpoints = new ArrayList<>();
         this.started = false;
         this.closed = false;
+        recalculateAssignments(unassignedSplits);
     }
 
     @Override
@@ -83,25 +86,36 @@ public class MockSplitEnumerator
 
     @Override
     public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
-        unassignedSplits.addAll(splits);
+        // add back to same subtaskId.
+        putPendingAssignments(subtaskId, splits);
     }
 
     @Override
     public void addReader(int subtaskId) {
-        List<MockSourceSplit> assignment = new ArrayList<>();
-        for (MockSourceSplit split : unassignedSplits) {
-            if (Integer.parseInt(split.splitId()) % 
enumContext.currentParallelism() == subtaskId) {
-                assignment.add(split);
+        ReaderInfo readerInfo = enumContext.registeredReaders().get(subtaskId);
+        List<MockSourceSplit> splitsOnRecovery = 
readerInfo.getReportedSplitsOnRegistration();
+
+        List<MockSourceSplit> redistributedSplits = new ArrayList<>();
+        List<MockSourceSplit> addBackSplits = new ArrayList<>();
+        for (MockSourceSplit split : splitsOnRecovery) {
+            if (!globalSplitAssignment.containsKey(split.splitId())) {
+                // if the split is not present in globalSplitAssignment, it 
means that this split is
+                // being registered for the first time and is eligible for 
redistribution.
+                redistributedSplits.add(split);
+            } else if (!globalSplitAssignment.containsKey(split.splitId())) {
+                //  if split is already assigned to other sub-task, just 
ignore it. Otherwise, add
+                // back to this sub-task again.
+                addBackSplits.add(split);
             }
         }
-        enumContext.assignSplits(
-                new SplitsAssignment<>(Collections.singletonMap(subtaskId, 
assignment)));
-        unassignedSplits.removeAll(assignment);
+        recalculateAssignments(redistributedSplits);
+        putPendingAssignments(subtaskId, addBackSplits);
+        assignAllSplits();
     }
 
     @Override
     public Set<MockSourceSplit> snapshotState(long checkpointId) {
-        return unassignedSplits;
+        return getUnassignedSplits();
     }
 
     @Override
@@ -114,11 +128,6 @@ public class MockSplitEnumerator
         this.closed = true;
     }
 
-    public void addNewSplits(List<MockSourceSplit> newSplits) {
-        unassignedSplits.addAll(newSplits);
-        assignAllSplits();
-    }
-
     // --------------------
 
     public boolean started() {
@@ -130,7 +139,9 @@ public class MockSplitEnumerator
     }
 
     public Set<MockSourceSplit> getUnassignedSplits() {
-        return unassignedSplits;
+        return pendingSplitAssignment.values().stream()
+                .flatMap(Set::stream)
+                .collect(Collectors.toSet());
     }
 
     public List<SourceEvent> getHandledSourceEvent() {
@@ -145,17 +156,27 @@ public class MockSplitEnumerator
 
     private void assignAllSplits() {
         Map<Integer, List<MockSourceSplit>> assignment = new HashMap<>();
-        unassignedSplits.forEach(
-                split -> {
-                    int subtaskId =
-                            Integer.parseInt(split.splitId()) % 
enumContext.currentParallelism();
-                    if 
(enumContext.registeredReaders().containsKey(subtaskId)) {
-                        assignment
-                                .computeIfAbsent(subtaskId, ignored -> new 
ArrayList<>())
-                                .add(split);
-                    }
-                });
+        for (Map.Entry<Integer, Set<MockSourceSplit>> iter : 
pendingSplitAssignment.entrySet()) {
+            Integer subtaskId = iter.getKey();
+            if (enumContext.registeredReaders().containsKey(subtaskId)) {
+                assignment.put(subtaskId, new ArrayList<>(iter.getValue()));
+            }
+        }
         enumContext.assignSplits(new SplitsAssignment<>(assignment));
-        assignment.values().forEach(l -> unassignedSplits.removeAll(l));
+        assignment.keySet().forEach(pendingSplitAssignment::remove);
+    }
+
+    private void recalculateAssignments(Collection<MockSourceSplit> newSplits) 
{
+        for (MockSourceSplit split : newSplits) {
+            int subtaskId = Integer.parseInt(split.splitId()) % 
enumContext.currentParallelism();
+            putPendingAssignments(subtaskId, Collections.singletonList(split));
+        }
+    }
+
+    private void putPendingAssignments(int subtaskId, 
Collection<MockSourceSplit> splits) {
+        Set<MockSourceSplit> pendingSplits =
+                pendingSplitAssignment.computeIfAbsent(subtaskId, 
HashSet::new);
+        pendingSplits.addAll(splits);
+        splits.forEach(split -> globalSplitAssignment.put(split.splitId(), 
subtaskId));
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 680013b8862..f75c9be0cea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -113,6 +113,9 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
     /** The Source that is associated with this SourceCoordinator. */
     private final Source<?, SplitT, EnumChkT> source;
 
+    /** The serializer that handles the serde of the split. */
+    private final SimpleVersionedSerializer<SplitT> splitSerializer;
+
     /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
     private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
 
@@ -163,6 +166,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
         this.operatorName = operatorName;
         this.source = source;
         this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+        this.splitSerializer = source.getSplitSerializer();
         this.context = context;
         this.coordinatorStore = coordinatorStore;
         this.watermarkAlignmentParams = watermarkAlignmentParams;
@@ -427,7 +431,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
                                 // assignments
                                 byte[] assignmentData =
                                         context.getAssignmentTracker()
-                                                
.snapshotState(source.getSplitSerializer());
+                                                
.snapshotState(splitSerializer);
                                 out.writeInt(assignmentData.length);
                                 out.write(assignmentData);
 
@@ -680,7 +684,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
     }
 
     private void handleReaderRegistrationEvent(
-            int subtask, int attemptNumber, ReaderRegistrationEvent event) {
+            int subtask, int attemptNumber, ReaderRegistrationEvent event) 
throws Exception {
         checkArgument(subtask == event.subtaskId());
 
         LOG.info(
@@ -692,7 +696,8 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
 
         final boolean subtaskReaderExisted =
                 context.registeredReadersOfAttempts().containsKey(subtask);
-        context.registerSourceReader(subtask, attemptNumber, event.location());
+        context.registerSourceReader(
+                subtask, attemptNumber, event.location(), 
event.splits(splitSerializer));
         if (!subtaskReaderExisted) {
             enumerator.addReader(event.subtaskId());
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index b469d6ecfd7..3f9d78b5864 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -465,8 +465,10 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
      * @param subtaskId the subtask id of the source reader.
      * @param attemptNumber the attempt number of the source reader.
      * @param location the location of the source reader.
+     * @param splits the split restored from source reader's state.
      */
-    void registerSourceReader(int subtaskId, int attemptNumber, String 
location) {
+    void registerSourceReader(
+            int subtaskId, int attemptNumber, String location, List<SplitT> 
splits) {
         final Map<Integer, ReaderInfo> attemptReaders =
                 registeredReaders.computeIfAbsent(subtaskId, k -> new 
ConcurrentHashMap<>());
         checkState(
@@ -474,7 +476,7 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
                 "ReaderInfo of subtask %s (#%s) already exists.",
                 subtaskId,
                 attemptNumber);
-        attemptReaders.put(attemptNumber, new ReaderInfo(subtaskId, location));
+        attemptReaders.put(attemptNumber, 
ReaderInfo.createReaderInfo(subtaskId, location, splits));
 
         sendCachedSplitsToNewlyRegisteredReader(subtaskId, attemptNumber);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
index 2f707cd550f..62483215740 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
@@ -18,11 +18,20 @@ limitations under the License.
 
 package org.apache.flink.runtime.source.event;
 
+import org.apache.flink.api.connector.source.SourceSplit;
+import 
org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 /**
- * An {@link OperatorEvent} that registers a {@link
- * org.apache.flink.api.connector.source.SourceReader SourceReader} to the 
SourceCoordinator.
+ * The SourceOperator should always send the ReaderRegistrationEvent with the
+ * `reportedSplitsOnRegistration` list. But it will not add the splits to 
readers if {@link
+ * SupportsSplitReassignmentOnRecovery} is implemented.
  */
 public class ReaderRegistrationEvent implements OperatorEvent {
 
@@ -30,10 +39,44 @@ public class ReaderRegistrationEvent implements 
OperatorEvent {
 
     private final int subtaskId;
     private final String location;
+    private final ArrayList<byte[]> splits;
 
     public ReaderRegistrationEvent(int subtaskId, String location) {
         this.subtaskId = subtaskId;
         this.location = location;
+        this.splits = new ArrayList<>();
+    }
+
+    ReaderRegistrationEvent(int subtaskId, String location, ArrayList<byte[]> 
splits) {
+        this.subtaskId = subtaskId;
+        this.location = location;
+        this.splits = splits;
+    }
+
+    public static <SplitT extends SourceSplit>
+            ReaderRegistrationEvent createReaderRegistrationEvent(
+                    int subtaskId,
+                    String location,
+                    List<SplitT> splits,
+                    SimpleVersionedSerializer<SplitT> splitSerializer)
+                    throws IOException {
+        ArrayList<byte[]> result = new ArrayList<>();
+        for (SplitT split : splits) {
+            result.add(splitSerializer.serialize(split));
+        }
+        return new ReaderRegistrationEvent(subtaskId, location, result);
+    }
+
+    public <SplitT extends SourceSplit> List<SplitT> splits(
+            SimpleVersionedSerializer<SplitT> splitSerializer) throws 
IOException {
+        if (splits.isEmpty()) {
+            return Collections.emptyList();
+        }
+        List<SplitT> result = new ArrayList<>(splits.size());
+        for (byte[] serializedSplit : splits) {
+            
result.add(splitSerializer.deserialize(splitSerializer.getVersion(), 
serializedSplit));
+        }
+        return result;
     }
 
     public int subtaskId() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index e6cb4b99518..09b12007d33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -215,6 +215,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     /** Watermark identifier to whether the watermark are aligned. */
     private final Map<String, Boolean> watermarkIsAlignedMap;
 
+    private final boolean supportsSplitReassignmentOnRecovery;
+
     public SourceOperator(
             StreamOperatorParameters<OUT> parameters,
             FunctionWithException<SourceReaderContext, SourceReader<OUT, 
SplitT>, Exception>
@@ -227,7 +229,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             String localHostname,
             boolean emitProgressiveWatermarks,
             CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
-            Map<String, Boolean> watermarkIsAlignedMap) {
+            Map<String, Boolean> watermarkIsAlignedMap,
+            boolean supportsSplitReassignmentOnRecovery) {
         super(parameters);
         this.readerFactory = checkNotNull(readerFactory);
         this.operatorEventGateway = checkNotNull(operatorEventGateway);
@@ -242,6 +245,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         this.allowUnalignedSourceSplits = 
configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
         this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
         this.watermarkIsAlignedMap = watermarkIsAlignedMap;
+        this.supportsSplitReassignmentOnRecovery = 
supportsSplitReassignmentOnRecovery;
     }
 
     @Override
@@ -411,7 +415,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
         // restore the state if necessary.
         final List<SplitT> splits = 
CollectionUtil.iterableToList(readerState.get());
-        if (!splits.isEmpty()) {
+        if (!splits.isEmpty() && !supportsSplitReassignmentOnRecovery) {
             LOG.info("Restoring state for {} split(s) to reader.", 
splits.size());
             for (SplitT s : splits) {
                 getOrCreateSplitMetricGroup(s.splitId());
@@ -421,7 +425,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         }
 
         // Register the reader to the coordinator.
-        registerReader();
+        registerReader(supportsSplitReassignmentOnRecovery ? splits : 
Collections.emptyList());
 
         sourceMetricGroup.idlingStarted();
         // Start the reader after registration, sending messages in start is 
allowed.
@@ -811,10 +815,13 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         return currentMaxDesiredWatermark < latestWatermark;
     }
 
-    private void registerReader() {
+    private void registerReader(List<SplitT> splits) throws Exception {
         operatorEventGateway.sendEventToCoordinator(
-                new ReaderRegistrationEvent(
-                        
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), localHostname));
+                ReaderRegistrationEvent.createReaderRegistrationEvent(
+                        
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
+                        localHostname,
+                        splits,
+                        splitSerializer));
     }
 
     // --------------- methods for unit tests ------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index caa78b022ce..6d1cbc88897 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import 
org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -128,7 +129,8 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
                                 .getTaskManagerExternalAddress(),
                         emitProgressiveWatermarks,
                         
parameters.getContainingTask().getCanEmitBatchOfRecords(),
-                        getSourceWatermarkDeclarations());
+                        getSourceWatermarkDeclarations(),
+                        source instanceof SupportsSplitReassignmentOnRecovery);
 
         
parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, 
sourceOperator);
 
@@ -199,7 +201,8 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
                     String localHostName,
                     boolean emitProgressiveWatermarks,
                     CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
-                    Collection<? extends WatermarkDeclaration> 
watermarkDeclarations) {
+                    Collection<? extends WatermarkDeclaration> 
watermarkDeclarations,
+                    boolean supportsSplitReassignmentOnRecovery) {
 
         // jumping through generics hoops: cast the generics away to then cast 
them back more
         // strictly typed
@@ -231,6 +234,7 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
                 localHostName,
                 emitProgressiveWatermarks,
                 canEmitBatchOfRecords,
-                watermarkIsAlignedMap);
+                watermarkIsAlignedMap,
+                supportsSplitReassignmentOnRecovery);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 4b92191d45b..c4fb85bedce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -57,6 +57,21 @@ class SourceCoordinatorContextTest extends 
SourceCoordinatorTestBase {
 
         final TestingSplitEnumerator<?> enumerator = getEnumerator();
         
assertThat(enumerator.getRegisteredReaders()).containsExactlyInAnyOrder(0, 1, 
2);
+
+        ReaderInfo readerInfoOfSubtask1 =
+                ReaderInfo.createReaderInfo(
+                        1, "subtask_1_location", Collections.singletonList(new 
MockSourceSplit(1)));
+        sourceCoordinator.subtaskReset(1, 1);
+        sourceCoordinator.handleEventFromOperator(
+                1,
+                1,
+                ReaderRegistrationEvent.createReaderRegistrationEvent(
+                        readerInfoOfSubtask1.getSubtaskId(),
+                        readerInfoOfSubtask1.getLocation(),
+                        readerInfoOfSubtask1.getReportedSplitsOnRegistration(),
+                        new MockSourceSplitSerializer()));
+        waitForCoordinatorToProcessActions();
+        
assertThat(context.registeredReaders().get(1)).isEqualTo(readerInfoOfSubtask1);
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 0bf8e526f36..b52403c8eae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -42,9 +42,11 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.RequestSplitEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.concurrent.FutureConsumerWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.jupiter.api.Test;
@@ -598,6 +600,125 @@ class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
         assertThat(coordinator.inferSourceParallelismAsync(2, 
1).get()).isEqualTo(2);
     }
 
+    @Test
+    void testDuplicateRedistribution() throws Exception {
+        final List<MockSourceSplit> splits =
+                Arrays.asList(
+                        new MockSourceSplit(0), new MockSourceSplit(1), new 
MockSourceSplit(2));
+
+        testRedistribution(
+                (coordinator) -> {
+                    registerReader(coordinator, 0, 0, splits);
+                    registerReader(coordinator, 1, 0, Collections.emptyList());
+                    registerReader(coordinator, 2, 0, Collections.emptyList());
+                    waitForCoordinatorToProcessActions();
+                    checkAddSplitEvents(new int[][] {new int[] {1}, new int[] 
{1}, new int[] {1}});
+
+                    // duplicate registration
+                    registerReader(coordinator, 0, 0, splits);
+                    waitForCoordinatorToProcessActions();
+                    //  split 1,2,3 won't be sent again.
+                    checkAddSplitEvents(new int[][] {new int[] {1}, new int[] 
{1}, new int[] {1}});
+                });
+    }
+
+    @Test
+    void testRedistributionInPartialRestartBeforeAnyCheckpoint() throws 
Exception {
+        final List<MockSourceSplit> splits =
+                Arrays.asList(
+                        new MockSourceSplit(0), new MockSourceSplit(1), new 
MockSourceSplit(2));
+
+        testRedistribution(
+                (coordinator) -> {
+                    registerReader(coordinator, 0, 0, splits);
+                    registerReader(coordinator, 1, 0, Collections.emptyList());
+                    registerReader(coordinator, 2, 0, Collections.emptyList());
+                    waitForCoordinatorToProcessActions();
+                    checkAddSplitEvents(new int[][] {new int[] {1}, new int[] 
{1}, new int[] {1}});
+
+                    coordinator.subtaskReset(0, 0);
+                    setReaderTaskReady(coordinator, 0, 1);
+                    registerReader(coordinator, 0, 1, splits);
+                    waitForCoordinatorToProcessActions();
+                    checkAddSplitEvents(
+                            new int[][] {new int[] {1, 1}, new int[] {1}, new 
int[] {1}});
+                });
+    }
+
+    @Test
+    void testRedistributionInPartialRestartAfterCheckpoint() throws Exception {
+        final List<MockSourceSplit> splits =
+                Arrays.asList(
+                        new MockSourceSplit(0), new MockSourceSplit(1), new 
MockSourceSplit(2));
+
+        testRedistribution(
+                (coordinator) -> {
+                    registerReader(coordinator, 0, 0, splits);
+                    registerReader(coordinator, 1, 0, Collections.emptyList());
+                    registerReader(coordinator, 2, 0, Collections.emptyList());
+                    waitForCoordinatorToProcessActions();
+                    checkAddSplitEvents(new int[][] {new int[] {1}, new int[] 
{1}, new int[] {1}});
+
+                    CompletableFuture<byte[]> fulture = new 
CompletableFuture<>();
+                    coordinator.checkpointCoordinator(1, fulture);
+                    fulture.get();
+
+                    coordinator.subtaskReset(0, 0);
+                    setReaderTaskReady(coordinator, 0, 1);
+                    registerReader(
+                            coordinator, 0, 1, Collections.singletonList(new 
MockSourceSplit(0)));
+                    waitForCoordinatorToProcessActions();
+                    checkAddSplitEvents(
+                            new int[][] {new int[] {1, 1}, new int[] {1}, new 
int[] {1}});
+                });
+    }
+
+    void testRedistribution(
+            FutureConsumerWithException<
+                            SourceCoordinator<MockSourceSplit, 
Set<MockSourceSplit>>, Exception>
+                    consumer)
+            throws Exception {
+        try (final SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> 
splitEnumerator =
+                        new MockSplitEnumerator(0, context);
+                final SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> 
coordinator =
+                        new SourceCoordinator<>(
+                                new JobID(),
+                                OPERATOR_NAME,
+                                new EnumeratorCreatingSource<>(() -> 
splitEnumerator),
+                                context,
+                                new CoordinatorStoreImpl(),
+                                
WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                                null)) {
+
+            coordinator.start();
+            setAllReaderTasksReady(coordinator);
+
+            consumer.accept(coordinator);
+        }
+    }
+
+    private void checkAddSplitEvents(int[][] expectedAssignedSplitNum) {
+        MockSourceSplitSerializer mockSourceSplitSerializer = new 
MockSourceSplitSerializer();
+        assertThat(expectedAssignedSplitNum.length).isEqualTo(NUM_SUBTASKS);
+        for (int i = 0; i < NUM_SUBTASKS; i++) {
+            List<OperatorEvent> sentEventsForSubtask = 
receivingTasks.getSentEventsForSubtask(i);
+            
assertThat(sentEventsForSubtask).hasSize(expectedAssignedSplitNum[i].length);
+            for (int j = 0; j < sentEventsForSubtask.size(); j++) {
+                
assertThat(sentEventsForSubtask.get(j)).isExactlyInstanceOf(AddSplitEvent.class);
+                List<MockSourceSplit> splits;
+                try {
+                    splits =
+                            ((AddSplitEvent<MockSourceSplit>) 
sentEventsForSubtask.get(j))
+                                    .splits(mockSourceSplitSerializer);
+                } catch (Exception e) {
+                    throw new RuntimeException();
+                }
+
+                assertThat(splits).hasSize(expectedAssignedSplitNum[i][j]);
+            }
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  test helpers
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index 61ece5c357f..0fdb564070a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -141,6 +142,22 @@ abstract class SourceCoordinatorTestBase {
                 new ReaderRegistrationEvent(subtask, 
createLocationFor(subtask, attemptNumber)));
     }
 
+    protected void registerReader(
+            SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> 
coordinator,
+            int subtask,
+            int attemptNumber,
+            List<MockSourceSplit> splits)
+            throws IOException {
+        coordinator.handleEventFromOperator(
+                subtask,
+                attemptNumber,
+                ReaderRegistrationEvent.createReaderRegistrationEvent(
+                        subtask,
+                        createLocationFor(subtask, attemptNumber),
+                        splits,
+                        new MockSourceSplitSerializer()));
+    }
+
     static String createLocationFor(int subtask, int attemptNumber) {
         return String.format("location_%d_%d", subtask, attemptNumber);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 8cc98109fdb..35621fbf5bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -99,7 +99,8 @@ class SourceOperatorSplitWatermarkAlignmentTest {
                         new MockOperatorEventGateway(),
                         1,
                         5,
-                        true);
+                        true,
+                        false);
         operator.initializeState(
                 new StreamTaskStateInitializerImpl(env, new 
HashMapStateBackend()));
 
@@ -547,7 +548,8 @@ class SourceOperatorSplitWatermarkAlignmentTest {
                         new MockOperatorEventGateway(),
                         1,
                         5,
-                        true);
+                        true,
+                        false);
         operator.initializeState(
                 new StreamTaskStateInitializerImpl(env, new 
HashMapStateBackend()));
         operator.open();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 25f54a632c5..aa4d2cf7081 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -43,11 +43,14 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.CollectorOutput;
+import org.apache.flink.streaming.util.MockOutput;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nullable;
 
@@ -99,24 +102,46 @@ class SourceOperatorTest {
                 .isNotNull();
     }
 
-    @Test
-    void testOpen() throws Exception {
-        // Initialize the operator.
-        operator.initializeState(context.createStateContext());
-        // Open the operator.
-        operator.open();
-        // The source reader should have been assigned a split.
-        assertThat(mockSourceReader.getAssignedSplits())
-                .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
-        // The source reader should have started.
-        assertThat(mockSourceReader.isStarted()).isTrue();
-
-        // A ReaderRegistrationRequest should have been sent.
-        assertThat(mockGateway.getEventsSent()).hasSize(1);
-        OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0);
-        assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class);
-        assertThat(((ReaderRegistrationEvent) operatorEvent).subtaskId())
-                .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX);
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testOpen(boolean supportsSplitReassignmentOnRecovery) throws 
Exception {
+        try (SourceOperatorTestContext context =
+                new SourceOperatorTestContext(
+                        false,
+                        false,
+                        WatermarkStrategy.noWatermarks(),
+                        new MockOutput<>(new ArrayList<>()),
+                        supportsSplitReassignmentOnRecovery)) {
+            SourceOperator<Integer, MockSourceSplit> operator = 
context.getOperator();
+            // Initialize the operator.
+            operator.initializeState(context.createStateContext());
+            // Open the operator.
+            operator.open();
+            // The source reader should have been assigned a split.
+            if (supportsSplitReassignmentOnRecovery) {
+                
assertThat(context.getSourceReader().getAssignedSplits()).isEmpty();
+            } else {
+                assertThat(context.getSourceReader().getAssignedSplits())
+                        .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
+            }
+
+            // The source reader should have started.
+            assertThat(context.getSourceReader().isStarted()).isTrue();
+
+            // A ReaderRegistrationRequest should have been sent.
+            assertThat(context.getGateway().getEventsSent()).hasSize(1);
+            OperatorEvent operatorEvent = 
context.getGateway().getEventsSent().get(0);
+            
assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class);
+            ReaderRegistrationEvent registrationEvent = 
(ReaderRegistrationEvent) operatorEvent;
+            assertThat(registrationEvent.subtaskId())
+                    .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX);
+            if (supportsSplitReassignmentOnRecovery) {
+                assertThat(registrationEvent.splits(new 
MockSourceSplitSerializer()))
+                        .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
+            } else {
+                assertThat(registrationEvent.splits(new 
MockSourceSplitSerializer())).isEmpty();
+            }
+        }
     }
 
     @Test
@@ -210,7 +235,8 @@ class SourceOperatorTest {
                         false,
                         WatermarkStrategy.<Integer>forMonotonousTimestamps()
                                 .withTimestampAssigner((element, 
recordTimestamp) -> element),
-                        new CollectorOutput<>(outputStreamElements));
+                        new CollectorOutput<>(outputStreamElements),
+                        false);
         operator = context.getOperator();
         operator.initializeState(context.createStateContext());
         operator.open();
@@ -251,7 +277,7 @@ class SourceOperatorTest {
 
     @Test
     public void testMetricGroupIsCreatedForRestoredSplit() throws Exception {
-        MockSourceSplit restoredSplit = new MockSourceSplit((2));
+        MockSourceSplit restoredSplit = new MockSourceSplit((1));
         StateInitializationContext stateContext =
                 
context.createStateContext(Collections.singletonList(restoredSplit));
         operator.initializeState(stateContext);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
index f7e75788eb3..6e198512445 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
@@ -75,14 +75,15 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
 
     public SourceOperatorTestContext(boolean idle, WatermarkStrategy<Integer> 
watermarkStrategy)
             throws Exception {
-        this(idle, false, watermarkStrategy, new MockOutput<>(new 
ArrayList<>()));
+        this(idle, false, watermarkStrategy, new MockOutput<>(new 
ArrayList<>()), false);
     }
 
     public SourceOperatorTestContext(
             boolean idle,
             boolean usePerSplitOutputs,
             WatermarkStrategy<Integer> watermarkStrategy,
-            Output<StreamRecord<Integer>> output)
+            Output<StreamRecord<Integer>> output,
+            boolean supportsSplitReassignmentOnRecovery)
             throws Exception {
         mockSourceReader =
                 new MockSourceReader(
@@ -109,7 +110,8 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
                         mockGateway,
                         SUBTASK_INDEX,
                         5,
-                        true);
+                        true,
+                        supportsSplitReassignmentOnRecovery);
         operator.initializeState(
                 new StreamTaskStateInitializerImpl(env, new 
HashMapStateBackend()));
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
index 63b7b8ff371..7bef5d80c6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
@@ -54,7 +54,8 @@ class SourceOperatorWatermarksTest {
                                                 new SourceOperatorAlignmentTest
                                                         .PunctuatedGenerator())
                                 .withTimestampAssigner((r, t) -> r),
-                        new MockOutput<>(new ArrayList<>()));
+                        new MockOutput<>(new ArrayList<>()),
+                        false);
         operator = context.getOperator();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 40a9bfe9486..dc8ebc9806e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -64,7 +64,8 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
             SourceReader<T, MockSourceSplit> reader,
             WatermarkStrategy<T> watermarkStrategy,
             ProcessingTimeService timeService,
-            boolean emitProgressiveWatermarks) {
+            boolean emitProgressiveWatermarks,
+            boolean supportsSplitReassignmentOnRecovery) {
 
         this(
                 parameters,
@@ -74,7 +75,8 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
                 new MockOperatorEventGateway(),
                 1,
                 5,
-                emitProgressiveWatermarks);
+                emitProgressiveWatermarks,
+                supportsSplitReassignmentOnRecovery);
     }
 
     public TestingSourceOperator(
@@ -85,7 +87,8 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
             OperatorEventGateway eventGateway,
             int subtaskIndex,
             int parallelism,
-            boolean emitProgressiveWatermarks) {
+            boolean emitProgressiveWatermarks,
+            boolean supportsSplitReassignmentOnRecovery) {
 
         super(
                 parameters,
@@ -98,7 +101,8 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
                 "localhost",
                 emitProgressiveWatermarks,
                 () -> false,
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                supportsSplitReassignmentOnRecovery);
 
         this.subtaskIndex = subtaskIndex;
         this.parallelism = parallelism;
@@ -130,6 +134,15 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
             WatermarkStrategy<T> watermarkStrategy,
             boolean emitProgressiveWatermarks)
             throws Exception {
+        return createTestOperator(reader, watermarkStrategy, 
emitProgressiveWatermarks, false);
+    }
+
+    public static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
+            SourceReader<T, MockSourceSplit> reader,
+            WatermarkStrategy<T> watermarkStrategy,
+            boolean emitProgressiveWatermarks,
+            boolean supportsSplitReassignmentOnRecovery)
+            throws Exception {
 
         AbstractStateBackend abstractStateBackend = new HashMapStateBackend();
         Environment env = new MockEnvironmentBuilder().build();
@@ -168,7 +181,8 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
                         reader,
                         watermarkStrategy,
                         timeService,
-                        emitProgressiveWatermarks);
+                        emitProgressiveWatermarks,
+                        supportsSplitReassignmentOnRecovery);
         sourceOperator.initializeState(stateContext);
         sourceOperator.open();
 

Reply via email to