This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7b0a8827ffc [FLINK-38564][connector] FLIP-537: Enumerator maintains
global splits distribution for reassignment
7b0a8827ffc is described below
commit 7b0a8827ffc111fcdd714682fb1c66be9d5d981d
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Oct 30 20:55:12 2025 +0800
[FLINK-38564][connector] FLIP-537: Enumerator maintains global splits
distribution for reassignment
[FLINK-38564][connector] FLIP-537: Enumerator maintains global splits
distribution for reassignment.
This closes #27149.
---
.../flink/api/connector/source/ReaderInfo.java | 44 ++++++--
.../SupportsSplitReassignmentOnRecovery.java | 29 +++++
.../source/mocks/MockSplitEnumerator.java | 87 +++++++++------
.../source/coordinator/SourceCoordinator.java | 11 +-
.../coordinator/SourceCoordinatorContext.java | 6 +-
.../source/event/ReaderRegistrationEvent.java | 47 +++++++-
.../streaming/api/operators/SourceOperator.java | 19 +++-
.../api/operators/SourceOperatorFactory.java | 10 +-
.../coordinator/SourceCoordinatorContextTest.java | 15 +++
.../source/coordinator/SourceCoordinatorTest.java | 121 +++++++++++++++++++++
.../coordinator/SourceCoordinatorTestBase.java | 17 +++
.../SourceOperatorSplitWatermarkAlignmentTest.java | 6 +-
.../api/operators/SourceOperatorTest.java | 66 +++++++----
.../api/operators/SourceOperatorTestContext.java | 8 +-
.../operators/SourceOperatorWatermarksTest.java | 3 +-
.../operators/source/TestingSourceOperator.java | 24 +++-
16 files changed, 424 insertions(+), 89 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
index 7a93c1c7549..27e6f871f4e 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
@@ -21,9 +21,16 @@ package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.Public;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
-/** A container class hosting the information of a {@link SourceReader}. */
+/**
+ * A container class hosting the information of a {@link SourceReader}.
+ *
+ * <p>The {@code reportedSplitsOnRegistration} can only be provided when the
source implements
+ * {@link SupportsSplitReassignmentOnRecovery}.
+ */
@Public
public final class ReaderInfo implements Serializable {
@@ -31,10 +38,27 @@ public final class ReaderInfo implements Serializable {
private final int subtaskId;
private final String location;
+ private final List<SourceSplit> reportedSplitsOnRegistration;
public ReaderInfo(int subtaskId, String location) {
+ this(subtaskId, location, Collections.emptyList());
+ }
+
+ ReaderInfo(int subtaskId, String location, List<SourceSplit> splits) {
this.subtaskId = subtaskId;
this.location = location;
+ this.reportedSplitsOnRegistration = splits;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <SplitT extends SourceSplit> ReaderInfo createReaderInfo(
+ int subtaskId, String location, List<SplitT> splits) {
+ return new ReaderInfo(subtaskId, location, (List<SourceSplit>) splits);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <SplitT extends SourceSplit> List<SplitT>
getReportedSplitsOnRegistration() {
+ return (List<SplitT>) reportedSplitsOnRegistration;
}
/**
@@ -52,16 +76,18 @@ public final class ReaderInfo implements Serializable {
}
@Override
- public int hashCode() {
- return Objects.hash(subtaskId, location);
+ public boolean equals(Object o) {
+ if (!(o instanceof ReaderInfo)) {
+ return false;
+ }
+ ReaderInfo that = (ReaderInfo) o;
+ return subtaskId == that.subtaskId
+ && Objects.equals(location, that.location)
+ && Objects.equals(reportedSplitsOnRegistration,
that.reportedSplitsOnRegistration);
}
@Override
- public boolean equals(Object obj) {
- if (!(obj instanceof ReaderInfo)) {
- return false;
- }
- ReaderInfo other = (ReaderInfo) obj;
- return subtaskId == other.subtaskId && location.equals(other.location);
+ public int hashCode() {
+ return Objects.hash(subtaskId, location, reportedSplitsOnRegistration);
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java
new file mode 100644
index 00000000000..225eb32dc66
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A decorative interface for {@link Source}. Implementing this interface
indicates that the source
+ * operator needs to report splits to the enumerator on start up and receive
reassignment on
+ * recovery.
+ */
+@PublicEvolving
+public interface SupportsSplitReassignmentOnRecovery {}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
index 7a3763bc5f9..2adc79360f3 100644
---
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
+++
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.connector.source.mocks;
+import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -28,20 +29,20 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
/** A mock {@link SplitEnumerator} for unit tests. */
public class MockSplitEnumerator
implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>,
SupportsBatchSnapshot {
- private final SortedSet<MockSourceSplit> unassignedSplits;
+ private final Map<Integer, Set<MockSourceSplit>> pendingSplitAssignment;
+ private final Map<String, Integer> globalSplitAssignment;
private final SplitEnumeratorContext<MockSourceSplit> enumContext;
private final List<SourceEvent> handledSourceEvent;
private final List<Long> successfulCheckpoints;
@@ -50,22 +51,24 @@ public class MockSplitEnumerator
public MockSplitEnumerator(int numSplits,
SplitEnumeratorContext<MockSourceSplit> enumContext) {
this(new HashSet<>(), enumContext);
+ List<MockSourceSplit> unassignedSplits = new ArrayList<>();
for (int i = 0; i < numSplits; i++) {
unassignedSplits.add(new MockSourceSplit(i));
}
+ recalculateAssignments(unassignedSplits);
}
public MockSplitEnumerator(
Set<MockSourceSplit> unassignedSplits,
SplitEnumeratorContext<MockSourceSplit> enumContext) {
- this.unassignedSplits =
- new TreeSet<>(Comparator.comparingInt(o ->
Integer.parseInt(o.splitId())));
- this.unassignedSplits.addAll(unassignedSplits);
+ this.pendingSplitAssignment = new HashMap<>();
+ this.globalSplitAssignment = new HashMap<>();
this.enumContext = enumContext;
this.handledSourceEvent = new ArrayList<>();
this.successfulCheckpoints = new ArrayList<>();
this.started = false;
this.closed = false;
+ recalculateAssignments(unassignedSplits);
}
@Override
@@ -83,25 +86,36 @@ public class MockSplitEnumerator
@Override
public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
- unassignedSplits.addAll(splits);
+ // add back to same subtaskId.
+ putPendingAssignments(subtaskId, splits);
}
@Override
public void addReader(int subtaskId) {
- List<MockSourceSplit> assignment = new ArrayList<>();
- for (MockSourceSplit split : unassignedSplits) {
- if (Integer.parseInt(split.splitId()) %
enumContext.currentParallelism() == subtaskId) {
- assignment.add(split);
+ ReaderInfo readerInfo = enumContext.registeredReaders().get(subtaskId);
+ List<MockSourceSplit> splitsOnRecovery =
readerInfo.getReportedSplitsOnRegistration();
+
+ List<MockSourceSplit> redistributedSplits = new ArrayList<>();
+ List<MockSourceSplit> addBackSplits = new ArrayList<>();
+ for (MockSourceSplit split : splitsOnRecovery) {
+ if (!globalSplitAssignment.containsKey(split.splitId())) {
+ // if the split is not present in globalSplitAssignment, it
means that this split is
+ // being registered for the first time and is eligible for
redistribution.
+ redistributedSplits.add(split);
+ } else if (!globalSplitAssignment.containsKey(split.splitId())) {
+ // if split is already assigned to other sub-task, just
ignore it. Otherwise, add
+ // back to this sub-task again.
+ addBackSplits.add(split);
}
}
- enumContext.assignSplits(
- new SplitsAssignment<>(Collections.singletonMap(subtaskId,
assignment)));
- unassignedSplits.removeAll(assignment);
+ recalculateAssignments(redistributedSplits);
+ putPendingAssignments(subtaskId, addBackSplits);
+ assignAllSplits();
}
@Override
public Set<MockSourceSplit> snapshotState(long checkpointId) {
- return unassignedSplits;
+ return getUnassignedSplits();
}
@Override
@@ -114,11 +128,6 @@ public class MockSplitEnumerator
this.closed = true;
}
- public void addNewSplits(List<MockSourceSplit> newSplits) {
- unassignedSplits.addAll(newSplits);
- assignAllSplits();
- }
-
// --------------------
public boolean started() {
@@ -130,7 +139,9 @@ public class MockSplitEnumerator
}
public Set<MockSourceSplit> getUnassignedSplits() {
- return unassignedSplits;
+ return pendingSplitAssignment.values().stream()
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet());
}
public List<SourceEvent> getHandledSourceEvent() {
@@ -145,17 +156,27 @@ public class MockSplitEnumerator
private void assignAllSplits() {
Map<Integer, List<MockSourceSplit>> assignment = new HashMap<>();
- unassignedSplits.forEach(
- split -> {
- int subtaskId =
- Integer.parseInt(split.splitId()) %
enumContext.currentParallelism();
- if
(enumContext.registeredReaders().containsKey(subtaskId)) {
- assignment
- .computeIfAbsent(subtaskId, ignored -> new
ArrayList<>())
- .add(split);
- }
- });
+ for (Map.Entry<Integer, Set<MockSourceSplit>> iter :
pendingSplitAssignment.entrySet()) {
+ Integer subtaskId = iter.getKey();
+ if (enumContext.registeredReaders().containsKey(subtaskId)) {
+ assignment.put(subtaskId, new ArrayList<>(iter.getValue()));
+ }
+ }
enumContext.assignSplits(new SplitsAssignment<>(assignment));
- assignment.values().forEach(l -> unassignedSplits.removeAll(l));
+ assignment.keySet().forEach(pendingSplitAssignment::remove);
+ }
+
+ private void recalculateAssignments(Collection<MockSourceSplit> newSplits)
{
+ for (MockSourceSplit split : newSplits) {
+ int subtaskId = Integer.parseInt(split.splitId()) %
enumContext.currentParallelism();
+ putPendingAssignments(subtaskId, Collections.singletonList(split));
+ }
+ }
+
+ private void putPendingAssignments(int subtaskId,
Collection<MockSourceSplit> splits) {
+ Set<MockSourceSplit> pendingSplits =
+ pendingSplitAssignment.computeIfAbsent(subtaskId,
HashSet::new);
+ pendingSplits.addAll(splits);
+ splits.forEach(split -> globalSplitAssignment.put(split.splitId(),
subtaskId));
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 680013b8862..f75c9be0cea 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -113,6 +113,9 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
/** The Source that is associated with this SourceCoordinator. */
private final Source<?, SplitT, EnumChkT> source;
+ /** The serializer that handles the serde of the split. */
+ private final SimpleVersionedSerializer<SplitT> splitSerializer;
+
/** The serializer that handles the serde of the SplitEnumerator
checkpoints. */
private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
@@ -163,6 +166,7 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
this.operatorName = operatorName;
this.source = source;
this.enumCheckpointSerializer =
source.getEnumeratorCheckpointSerializer();
+ this.splitSerializer = source.getSplitSerializer();
this.context = context;
this.coordinatorStore = coordinatorStore;
this.watermarkAlignmentParams = watermarkAlignmentParams;
@@ -427,7 +431,7 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
// assignments
byte[] assignmentData =
context.getAssignmentTracker()
-
.snapshotState(source.getSplitSerializer());
+
.snapshotState(splitSerializer);
out.writeInt(assignmentData.length);
out.write(assignmentData);
@@ -680,7 +684,7 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
}
private void handleReaderRegistrationEvent(
- int subtask, int attemptNumber, ReaderRegistrationEvent event) {
+ int subtask, int attemptNumber, ReaderRegistrationEvent event)
throws Exception {
checkArgument(subtask == event.subtaskId());
LOG.info(
@@ -692,7 +696,8 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
final boolean subtaskReaderExisted =
context.registeredReadersOfAttempts().containsKey(subtask);
- context.registerSourceReader(subtask, attemptNumber, event.location());
+ context.registerSourceReader(
+ subtask, attemptNumber, event.location(),
event.splits(splitSerializer));
if (!subtaskReaderExisted) {
enumerator.addReader(event.subtaskId());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index b469d6ecfd7..3f9d78b5864 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -465,8 +465,10 @@ public class SourceCoordinatorContext<SplitT extends
SourceSplit>
* @param subtaskId the subtask id of the source reader.
* @param attemptNumber the attempt number of the source reader.
* @param location the location of the source reader.
+ * @param splits the split restored from source reader's state.
*/
- void registerSourceReader(int subtaskId, int attemptNumber, String
location) {
+ void registerSourceReader(
+ int subtaskId, int attemptNumber, String location, List<SplitT>
splits) {
final Map<Integer, ReaderInfo> attemptReaders =
registeredReaders.computeIfAbsent(subtaskId, k -> new
ConcurrentHashMap<>());
checkState(
@@ -474,7 +476,7 @@ public class SourceCoordinatorContext<SplitT extends
SourceSplit>
"ReaderInfo of subtask %s (#%s) already exists.",
subtaskId,
attemptNumber);
- attemptReaders.put(attemptNumber, new ReaderInfo(subtaskId, location));
+ attemptReaders.put(attemptNumber,
ReaderInfo.createReaderInfo(subtaskId, location, splits));
sendCachedSplitsToNewlyRegisteredReader(subtaskId, attemptNumber);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
index 2f707cd550f..62483215740 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
@@ -18,11 +18,20 @@ limitations under the License.
package org.apache.flink.runtime.source.event;
+import org.apache.flink.api.connector.source.SourceSplit;
+import
org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
/**
- * An {@link OperatorEvent} that registers a {@link
- * org.apache.flink.api.connector.source.SourceReader SourceReader} to the
SourceCoordinator.
+ * The SourceOperator should always send the ReaderRegistrationEvent with the
+ * `reportedSplitsOnRegistration` list. But it will not add the splits to
readers if {@link
+ * SupportsSplitReassignmentOnRecovery} is implemented.
*/
public class ReaderRegistrationEvent implements OperatorEvent {
@@ -30,10 +39,44 @@ public class ReaderRegistrationEvent implements
OperatorEvent {
private final int subtaskId;
private final String location;
+ private final ArrayList<byte[]> splits;
public ReaderRegistrationEvent(int subtaskId, String location) {
this.subtaskId = subtaskId;
this.location = location;
+ this.splits = new ArrayList<>();
+ }
+
+ ReaderRegistrationEvent(int subtaskId, String location, ArrayList<byte[]>
splits) {
+ this.subtaskId = subtaskId;
+ this.location = location;
+ this.splits = splits;
+ }
+
+ public static <SplitT extends SourceSplit>
+ ReaderRegistrationEvent createReaderRegistrationEvent(
+ int subtaskId,
+ String location,
+ List<SplitT> splits,
+ SimpleVersionedSerializer<SplitT> splitSerializer)
+ throws IOException {
+ ArrayList<byte[]> result = new ArrayList<>();
+ for (SplitT split : splits) {
+ result.add(splitSerializer.serialize(split));
+ }
+ return new ReaderRegistrationEvent(subtaskId, location, result);
+ }
+
+ public <SplitT extends SourceSplit> List<SplitT> splits(
+ SimpleVersionedSerializer<SplitT> splitSerializer) throws
IOException {
+ if (splits.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<SplitT> result = new ArrayList<>(splits.size());
+ for (byte[] serializedSplit : splits) {
+
result.add(splitSerializer.deserialize(splitSerializer.getVersion(),
serializedSplit));
+ }
+ return result;
}
public int subtaskId() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index e6cb4b99518..09b12007d33 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -215,6 +215,8 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
/** Watermark identifier to whether the watermark are aligned. */
private final Map<String, Boolean> watermarkIsAlignedMap;
+ private final boolean supportsSplitReassignmentOnRecovery;
+
public SourceOperator(
StreamOperatorParameters<OUT> parameters,
FunctionWithException<SourceReaderContext, SourceReader<OUT,
SplitT>, Exception>
@@ -227,7 +229,8 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
String localHostname,
boolean emitProgressiveWatermarks,
CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
- Map<String, Boolean> watermarkIsAlignedMap) {
+ Map<String, Boolean> watermarkIsAlignedMap,
+ boolean supportsSplitReassignmentOnRecovery) {
super(parameters);
this.readerFactory = checkNotNull(readerFactory);
this.operatorEventGateway = checkNotNull(operatorEventGateway);
@@ -242,6 +245,7 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
this.allowUnalignedSourceSplits =
configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
this.watermarkIsAlignedMap = watermarkIsAlignedMap;
+ this.supportsSplitReassignmentOnRecovery =
supportsSplitReassignmentOnRecovery;
}
@Override
@@ -411,7 +415,7 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
// restore the state if necessary.
final List<SplitT> splits =
CollectionUtil.iterableToList(readerState.get());
- if (!splits.isEmpty()) {
+ if (!splits.isEmpty() && !supportsSplitReassignmentOnRecovery) {
LOG.info("Restoring state for {} split(s) to reader.",
splits.size());
for (SplitT s : splits) {
getOrCreateSplitMetricGroup(s.splitId());
@@ -421,7 +425,7 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
}
// Register the reader to the coordinator.
- registerReader();
+ registerReader(supportsSplitReassignmentOnRecovery ? splits :
Collections.emptyList());
sourceMetricGroup.idlingStarted();
// Start the reader after registration, sending messages in start is
allowed.
@@ -811,10 +815,13 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
return currentMaxDesiredWatermark < latestWatermark;
}
- private void registerReader() {
+ private void registerReader(List<SplitT> splits) throws Exception {
operatorEventGateway.sendEventToCoordinator(
- new ReaderRegistrationEvent(
-
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), localHostname));
+ ReaderRegistrationEvent.createReaderRegistrationEvent(
+
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
+ localHostname,
+ splits,
+ splitSerializer));
}
// --------------- methods for unit tests ------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index caa78b022ce..6d1cbc88897 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
+import
org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -128,7 +129,8 @@ public class SourceOperatorFactory<OUT> extends
AbstractStreamOperatorFactory<OU
.getTaskManagerExternalAddress(),
emitProgressiveWatermarks,
parameters.getContainingTask().getCanEmitBatchOfRecords(),
- getSourceWatermarkDeclarations());
+ getSourceWatermarkDeclarations(),
+ source instanceof SupportsSplitReassignmentOnRecovery);
parameters.getOperatorEventDispatcher().registerEventHandler(operatorId,
sourceOperator);
@@ -199,7 +201,8 @@ public class SourceOperatorFactory<OUT> extends
AbstractStreamOperatorFactory<OU
String localHostName,
boolean emitProgressiveWatermarks,
CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
- Collection<? extends WatermarkDeclaration>
watermarkDeclarations) {
+ Collection<? extends WatermarkDeclaration>
watermarkDeclarations,
+ boolean supportsSplitReassignmentOnRecovery) {
// jumping through generics hoops: cast the generics away to then cast
them back more
// strictly typed
@@ -231,6 +234,7 @@ public class SourceOperatorFactory<OUT> extends
AbstractStreamOperatorFactory<OU
localHostName,
emitProgressiveWatermarks,
canEmitBatchOfRecords,
- watermarkIsAlignedMap);
+ watermarkIsAlignedMap,
+ supportsSplitReassignmentOnRecovery);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 4b92191d45b..c4fb85bedce 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -57,6 +57,21 @@ class SourceCoordinatorContextTest extends
SourceCoordinatorTestBase {
final TestingSplitEnumerator<?> enumerator = getEnumerator();
assertThat(enumerator.getRegisteredReaders()).containsExactlyInAnyOrder(0, 1,
2);
+
+ ReaderInfo readerInfoOfSubtask1 =
+ ReaderInfo.createReaderInfo(
+ 1, "subtask_1_location", Collections.singletonList(new
MockSourceSplit(1)));
+ sourceCoordinator.subtaskReset(1, 1);
+ sourceCoordinator.handleEventFromOperator(
+ 1,
+ 1,
+ ReaderRegistrationEvent.createReaderRegistrationEvent(
+ readerInfoOfSubtask1.getSubtaskId(),
+ readerInfoOfSubtask1.getLocation(),
+ readerInfoOfSubtask1.getReportedSplitsOnRegistration(),
+ new MockSourceSplitSerializer()));
+ waitForCoordinatorToProcessActions();
+
assertThat(context.registeredReaders().get(1)).isEqualTo(readerInfoOfSubtask1);
}
@Test
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 0bf8e526f36..b52403c8eae 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -42,9 +42,11 @@ import
org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.concurrent.FutureConsumerWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.jupiter.api.Test;
@@ -598,6 +600,125 @@ class SourceCoordinatorTest extends
SourceCoordinatorTestBase {
assertThat(coordinator.inferSourceParallelismAsync(2,
1).get()).isEqualTo(2);
}
+ @Test
+ void testDuplicateRedistribution() throws Exception {
+ final List<MockSourceSplit> splits =
+ Arrays.asList(
+ new MockSourceSplit(0), new MockSourceSplit(1), new
MockSourceSplit(2));
+
+ testRedistribution(
+ (coordinator) -> {
+ registerReader(coordinator, 0, 0, splits);
+ registerReader(coordinator, 1, 0, Collections.emptyList());
+ registerReader(coordinator, 2, 0, Collections.emptyList());
+ waitForCoordinatorToProcessActions();
+ checkAddSplitEvents(new int[][] {new int[] {1}, new int[]
{1}, new int[] {1}});
+
+ // duplicate registration
+ registerReader(coordinator, 0, 0, splits);
+ waitForCoordinatorToProcessActions();
+ // split 1,2,3 won't be sent again.
+ checkAddSplitEvents(new int[][] {new int[] {1}, new int[]
{1}, new int[] {1}});
+ });
+ }
+
+ @Test
+ void testRedistributionInPartialRestartBeforeAnyCheckpoint() throws
Exception {
+ final List<MockSourceSplit> splits =
+ Arrays.asList(
+ new MockSourceSplit(0), new MockSourceSplit(1), new
MockSourceSplit(2));
+
+ testRedistribution(
+ (coordinator) -> {
+ registerReader(coordinator, 0, 0, splits);
+ registerReader(coordinator, 1, 0, Collections.emptyList());
+ registerReader(coordinator, 2, 0, Collections.emptyList());
+ waitForCoordinatorToProcessActions();
+ checkAddSplitEvents(new int[][] {new int[] {1}, new int[]
{1}, new int[] {1}});
+
+ coordinator.subtaskReset(0, 0);
+ setReaderTaskReady(coordinator, 0, 1);
+ registerReader(coordinator, 0, 1, splits);
+ waitForCoordinatorToProcessActions();
+ checkAddSplitEvents(
+ new int[][] {new int[] {1, 1}, new int[] {1}, new
int[] {1}});
+ });
+ }
+
+ @Test
+ void testRedistributionInPartialRestartAfterCheckpoint() throws Exception {
+ final List<MockSourceSplit> splits =
+ Arrays.asList(
+ new MockSourceSplit(0), new MockSourceSplit(1), new
MockSourceSplit(2));
+
+ testRedistribution(
+ (coordinator) -> {
+ registerReader(coordinator, 0, 0, splits);
+ registerReader(coordinator, 1, 0, Collections.emptyList());
+ registerReader(coordinator, 2, 0, Collections.emptyList());
+ waitForCoordinatorToProcessActions();
+ checkAddSplitEvents(new int[][] {new int[] {1}, new int[]
{1}, new int[] {1}});
+
+ CompletableFuture<byte[]> fulture = new
CompletableFuture<>();
+ coordinator.checkpointCoordinator(1, fulture);
+ fulture.get();
+
+ coordinator.subtaskReset(0, 0);
+ setReaderTaskReady(coordinator, 0, 1);
+ registerReader(
+ coordinator, 0, 1, Collections.singletonList(new
MockSourceSplit(0)));
+ waitForCoordinatorToProcessActions();
+ checkAddSplitEvents(
+ new int[][] {new int[] {1, 1}, new int[] {1}, new
int[] {1}});
+ });
+ }
+
+ void testRedistribution(
+ FutureConsumerWithException<
+ SourceCoordinator<MockSourceSplit,
Set<MockSourceSplit>>, Exception>
+ consumer)
+ throws Exception {
+ try (final SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>
splitEnumerator =
+ new MockSplitEnumerator(0, context);
+ final SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>
coordinator =
+ new SourceCoordinator<>(
+ new JobID(),
+ OPERATOR_NAME,
+ new EnumeratorCreatingSource<>(() ->
splitEnumerator),
+ context,
+ new CoordinatorStoreImpl(),
+
WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+ null)) {
+
+ coordinator.start();
+ setAllReaderTasksReady(coordinator);
+
+ consumer.accept(coordinator);
+ }
+ }
+
+ private void checkAddSplitEvents(int[][] expectedAssignedSplitNum) {
+ MockSourceSplitSerializer mockSourceSplitSerializer = new
MockSourceSplitSerializer();
+ assertThat(expectedAssignedSplitNum.length).isEqualTo(NUM_SUBTASKS);
+ for (int i = 0; i < NUM_SUBTASKS; i++) {
+ List<OperatorEvent> sentEventsForSubtask =
receivingTasks.getSentEventsForSubtask(i);
+
assertThat(sentEventsForSubtask).hasSize(expectedAssignedSplitNum[i].length);
+ for (int j = 0; j < sentEventsForSubtask.size(); j++) {
+
assertThat(sentEventsForSubtask.get(j)).isExactlyInstanceOf(AddSplitEvent.class);
+ List<MockSourceSplit> splits;
+ try {
+ splits =
+ ((AddSplitEvent<MockSourceSplit>)
sentEventsForSubtask.get(j))
+ .splits(mockSourceSplitSerializer);
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+
+ assertThat(splits).hasSize(expectedAssignedSplitNum[i][j]);
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// test helpers
// ------------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index 61ece5c357f..0fdb564070a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -141,6 +142,22 @@ abstract class SourceCoordinatorTestBase {
new ReaderRegistrationEvent(subtask,
createLocationFor(subtask, attemptNumber)));
}
+ protected void registerReader(
+ SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>
coordinator,
+ int subtask,
+ int attemptNumber,
+ List<MockSourceSplit> splits)
+ throws IOException {
+ coordinator.handleEventFromOperator(
+ subtask,
+ attemptNumber,
+ ReaderRegistrationEvent.createReaderRegistrationEvent(
+ subtask,
+ createLocationFor(subtask, attemptNumber),
+ splits,
+ new MockSourceSplitSerializer()));
+ }
+
static String createLocationFor(int subtask, int attemptNumber) {
return String.format("location_%d_%d", subtask, attemptNumber);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 8cc98109fdb..35621fbf5bc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -99,7 +99,8 @@ class SourceOperatorSplitWatermarkAlignmentTest {
new MockOperatorEventGateway(),
1,
5,
- true);
+ true,
+ false);
operator.initializeState(
new StreamTaskStateInitializerImpl(env, new
HashMapStateBackend()));
@@ -547,7 +548,8 @@ class SourceOperatorSplitWatermarkAlignmentTest {
new MockOperatorEventGateway(),
1,
5,
- true);
+ true,
+ false);
operator.initializeState(
new StreamTaskStateInitializerImpl(env, new
HashMapStateBackend()));
operator.open();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 25f54a632c5..aa4d2cf7081 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -43,11 +43,14 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.CollectorOutput;
+import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
@@ -99,24 +102,46 @@ class SourceOperatorTest {
.isNotNull();
}
- @Test
- void testOpen() throws Exception {
- // Initialize the operator.
- operator.initializeState(context.createStateContext());
- // Open the operator.
- operator.open();
- // The source reader should have been assigned a split.
- assertThat(mockSourceReader.getAssignedSplits())
- .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
- // The source reader should have started.
- assertThat(mockSourceReader.isStarted()).isTrue();
-
- // A ReaderRegistrationRequest should have been sent.
- assertThat(mockGateway.getEventsSent()).hasSize(1);
- OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0);
- assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class);
- assertThat(((ReaderRegistrationEvent) operatorEvent).subtaskId())
- .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX);
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testOpen(boolean supportsSplitReassignmentOnRecovery) throws
Exception {
+ try (SourceOperatorTestContext context =
+ new SourceOperatorTestContext(
+ false,
+ false,
+ WatermarkStrategy.noWatermarks(),
+ new MockOutput<>(new ArrayList<>()),
+ supportsSplitReassignmentOnRecovery)) {
+ SourceOperator<Integer, MockSourceSplit> operator =
context.getOperator();
+ // Initialize the operator.
+ operator.initializeState(context.createStateContext());
+ // Open the operator.
+ operator.open();
+ // The source reader should have been assigned a split.
+ if (supportsSplitReassignmentOnRecovery) {
+
assertThat(context.getSourceReader().getAssignedSplits()).isEmpty();
+ } else {
+ assertThat(context.getSourceReader().getAssignedSplits())
+ .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
+ }
+
+ // The source reader should have started.
+ assertThat(context.getSourceReader().isStarted()).isTrue();
+
+ // A ReaderRegistrationRequest should have been sent.
+ assertThat(context.getGateway().getEventsSent()).hasSize(1);
+ OperatorEvent operatorEvent =
context.getGateway().getEventsSent().get(0);
+
assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class);
+ ReaderRegistrationEvent registrationEvent =
(ReaderRegistrationEvent) operatorEvent;
+ assertThat(registrationEvent.subtaskId())
+ .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX);
+ if (supportsSplitReassignmentOnRecovery) {
+ assertThat(registrationEvent.splits(new
MockSourceSplitSerializer()))
+ .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
+ } else {
+ assertThat(registrationEvent.splits(new
MockSourceSplitSerializer())).isEmpty();
+ }
+ }
}
@Test
@@ -210,7 +235,8 @@ class SourceOperatorTest {
false,
WatermarkStrategy.<Integer>forMonotonousTimestamps()
.withTimestampAssigner((element,
recordTimestamp) -> element),
- new CollectorOutput<>(outputStreamElements));
+ new CollectorOutput<>(outputStreamElements),
+ false);
operator = context.getOperator();
operator.initializeState(context.createStateContext());
operator.open();
@@ -251,7 +277,7 @@ class SourceOperatorTest {
@Test
public void testMetricGroupIsCreatedForRestoredSplit() throws Exception {
- MockSourceSplit restoredSplit = new MockSourceSplit((2));
+ MockSourceSplit restoredSplit = new MockSourceSplit((1));
StateInitializationContext stateContext =
context.createStateContext(Collections.singletonList(restoredSplit));
operator.initializeState(stateContext);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
index f7e75788eb3..6e198512445 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
@@ -75,14 +75,15 @@ public class SourceOperatorTestContext implements
AutoCloseable {
public SourceOperatorTestContext(boolean idle, WatermarkStrategy<Integer>
watermarkStrategy)
throws Exception {
- this(idle, false, watermarkStrategy, new MockOutput<>(new
ArrayList<>()));
+ this(idle, false, watermarkStrategy, new MockOutput<>(new
ArrayList<>()), false);
}
public SourceOperatorTestContext(
boolean idle,
boolean usePerSplitOutputs,
WatermarkStrategy<Integer> watermarkStrategy,
- Output<StreamRecord<Integer>> output)
+ Output<StreamRecord<Integer>> output,
+ boolean supportsSplitReassignmentOnRecovery)
throws Exception {
mockSourceReader =
new MockSourceReader(
@@ -109,7 +110,8 @@ public class SourceOperatorTestContext implements
AutoCloseable {
mockGateway,
SUBTASK_INDEX,
5,
- true);
+ true,
+ supportsSplitReassignmentOnRecovery);
operator.initializeState(
new StreamTaskStateInitializerImpl(env, new
HashMapStateBackend()));
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
index 63b7b8ff371..7bef5d80c6c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
@@ -54,7 +54,8 @@ class SourceOperatorWatermarksTest {
new SourceOperatorAlignmentTest
.PunctuatedGenerator())
.withTimestampAssigner((r, t) -> r),
- new MockOutput<>(new ArrayList<>()));
+ new MockOutput<>(new ArrayList<>()),
+ false);
operator = context.getOperator();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 40a9bfe9486..dc8ebc9806e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -64,7 +64,8 @@ public class TestingSourceOperator<T> extends
SourceOperator<T, MockSourceSplit>
SourceReader<T, MockSourceSplit> reader,
WatermarkStrategy<T> watermarkStrategy,
ProcessingTimeService timeService,
- boolean emitProgressiveWatermarks) {
+ boolean emitProgressiveWatermarks,
+ boolean supportsSplitReassignmentOnRecovery) {
this(
parameters,
@@ -74,7 +75,8 @@ public class TestingSourceOperator<T> extends
SourceOperator<T, MockSourceSplit>
new MockOperatorEventGateway(),
1,
5,
- emitProgressiveWatermarks);
+ emitProgressiveWatermarks,
+ supportsSplitReassignmentOnRecovery);
}
public TestingSourceOperator(
@@ -85,7 +87,8 @@ public class TestingSourceOperator<T> extends
SourceOperator<T, MockSourceSplit>
OperatorEventGateway eventGateway,
int subtaskIndex,
int parallelism,
- boolean emitProgressiveWatermarks) {
+ boolean emitProgressiveWatermarks,
+ boolean supportsSplitReassignmentOnRecovery) {
super(
parameters,
@@ -98,7 +101,8 @@ public class TestingSourceOperator<T> extends
SourceOperator<T, MockSourceSplit>
"localhost",
emitProgressiveWatermarks,
() -> false,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ supportsSplitReassignmentOnRecovery);
this.subtaskIndex = subtaskIndex;
this.parallelism = parallelism;
@@ -130,6 +134,15 @@ public class TestingSourceOperator<T> extends
SourceOperator<T, MockSourceSplit>
WatermarkStrategy<T> watermarkStrategy,
boolean emitProgressiveWatermarks)
throws Exception {
+ return createTestOperator(reader, watermarkStrategy,
emitProgressiveWatermarks, false);
+ }
+
+ public static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
+ SourceReader<T, MockSourceSplit> reader,
+ WatermarkStrategy<T> watermarkStrategy,
+ boolean emitProgressiveWatermarks,
+ boolean supportsSplitReassignmentOnRecovery)
+ throws Exception {
AbstractStateBackend abstractStateBackend = new HashMapStateBackend();
Environment env = new MockEnvironmentBuilder().build();
@@ -168,7 +181,8 @@ public class TestingSourceOperator<T> extends
SourceOperator<T, MockSourceSplit>
reader,
watermarkStrategy,
timeService,
- emitProgressiveWatermarks);
+ emitProgressiveWatermarks,
+ supportsSplitReassignmentOnRecovery);
sourceOperator.initializeState(stateContext);
sourceOperator.open();