1996fanrui commented on code in PR #27639:
URL: https://github.com/apache/flink/pull/27639#discussion_r2938977038
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java:
##########
@@ -57,6 +62,13 @@ public abstract class RecoveredInputChannel extends
InputChannel implements Chan
private final CompletableFuture<?> stateConsumedFuture = new
CompletableFuture<>();
protected final BufferManager bufferManager;
+ /**
+ * Future that completes when recovered buffers have been filtered for
this channel. This
+ * completes before stateConsumedFuture, enabling earlier RUNNING state
transition when
+ * unaligned checkpoint during recovery is enabled.
+ */
+ private final CompletableFuture<Void> bufferFilteringCompleteFuture = new
CompletableFuture<>();
Review Comment:
Master branch:
- INITIALIZING phase is Downloading state + initialising state backend +
consuming all input and output buffers.
PR branch:
- INITIALIZING phase is Downloading state + initialising state backend +
filtering buffers.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java:
##########
@@ -47,9 +47,14 @@ public static <T> StreamTaskInput<T> create(
Function<Integer, StreamPartitioner<?>> gatePartitioners,
TaskInfo taskInfo,
CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
- Set<AbstractInternalWatermarkDeclaration<?>>
watermarkDeclarationSet) {
+ Set<AbstractInternalWatermarkDeclaration<?>>
watermarkDeclarationSet,
+ boolean unalignedDuringRecoveryEnabled) {
return rescalingDescriptorinflightDataRescalingDescriptor.equals(
- InflightDataRescalingDescriptor.NO_RESCALE)
+ InflightDataRescalingDescriptor.NO_RESCALE)
+ // When filter during recovery is enabled, records are
already filtered in
+ // the channel-state-unspilling thread. Use
StreamTaskNetworkInput to avoid
+ // redundant demultiplexing/filtering in the Task
thread.
+ || unalignedDuringRecoveryEnabled
Review Comment:
When filter during recovery is enabled, records are already filtered in the
channel-state-unspilling thread. Use StreamTaskNetworkInput to avoid redundant
demultiplexing/filtering in the Task thread.
`LocalInputChannel` and `RemoteInputChannel` consume filtered buffers just
like they would consume new buffers. Therefore, RescalingStreamTaskNetworkInput
is not needed when `execution.checkpointing.unaligned.during-recovery.enabled`
is enabled.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java:
##########
@@ -57,6 +62,13 @@ public abstract class RecoveredInputChannel extends
InputChannel implements Chan
private final CompletableFuture<?> stateConsumedFuture = new
CompletableFuture<>();
protected final BufferManager bufferManager;
+ /**
+ * Future that completes when recovered buffers have been filtered for
this channel. This
+ * completes before stateConsumedFuture, enabling earlier RUNNING state
transition when
+ * unaligned checkpoint during recovery is enabled.
+ */
+ private final CompletableFuture<Void> bufferFilteringCompleteFuture = new
CompletableFuture<>();
Review Comment:
- `stateConsumedFuture` is the original `CompletableFuture`, it will be
completed when all recovered input and output buffers are consumed.
- `bufferFilteringCompleteFuture` is the new `CompletableFuture`, it will be
completed once all all recovered input and output buffers are filtered.
So `bufferFilteringCompleteFuture` will be completed soon, it does not need
to wait for any data processing.
- After all `bufferFilteringCompleteFuture` are completed, the task will be
switched from initialization to running
- After all tasks are running, the checkpoint can be handled properly.
The current PR significantly reduces the duration of the initialization
phase. Generally, this process is very fast since do not need to wait for any
data processing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]