1996fanrui commented on code in PR #27639:
URL: https://github.com/apache/flink/pull/27639#discussion_r2938977038


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java:
##########
@@ -57,6 +62,13 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
     private final CompletableFuture<?> stateConsumedFuture = new 
CompletableFuture<>();
     protected final BufferManager bufferManager;
 
+    /**
+     * Future that completes when recovered buffers have been filtered for 
this channel. This
+     * completes before stateConsumedFuture, enabling earlier RUNNING state 
transition when
+     * unaligned checkpoint during recovery is enabled.
+     */
+    private final CompletableFuture<Void> bufferFilteringCompleteFuture = new 
CompletableFuture<>();

Review Comment:
   Master branch:
   
   - INITIALIZING  phase is Downloading state + initialising state backend + 
consuming all input and output buffers.
   
   PR branch:
   
   - INITIALIZING  phase is Downloading state + initialising state backend + 
filtering buffers.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java:
##########
@@ -47,9 +47,14 @@ public static <T> StreamTaskInput<T> create(
             Function<Integer, StreamPartitioner<?>> gatePartitioners,
             TaskInfo taskInfo,
             CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
-            Set<AbstractInternalWatermarkDeclaration<?>> 
watermarkDeclarationSet) {
+            Set<AbstractInternalWatermarkDeclaration<?>> 
watermarkDeclarationSet,
+            boolean unalignedDuringRecoveryEnabled) {
         return rescalingDescriptorinflightDataRescalingDescriptor.equals(
-                        InflightDataRescalingDescriptor.NO_RESCALE)
+                                InflightDataRescalingDescriptor.NO_RESCALE)
+                        // When filter during recovery is enabled, records are 
already filtered in
+                        // the channel-state-unspilling thread. Use 
StreamTaskNetworkInput to avoid
+                        // redundant demultiplexing/filtering in the Task 
thread.
+                        || unalignedDuringRecoveryEnabled

Review Comment:
   When filter during recovery is enabled, records are already filtered in the 
channel-state-unspilling thread. Use StreamTaskNetworkInput to avoid redundant 
demultiplexing/filtering in the Task thread.
   
   `LocalInputChannel` and `RemoteInputChannel` consume filtered buffers just 
like they would consume new buffers. Therefore, RescalingStreamTaskNetworkInput 
is not needed when `execution.checkpointing.unaligned.during-recovery.enabled` 
is enabled.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java:
##########
@@ -57,6 +62,13 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
     private final CompletableFuture<?> stateConsumedFuture = new 
CompletableFuture<>();
     protected final BufferManager bufferManager;
 
+    /**
+     * Future that completes when recovered buffers have been filtered for 
this channel. This
+     * completes before stateConsumedFuture, enabling earlier RUNNING state 
transition when
+     * unaligned checkpoint during recovery is enabled.
+     */
+    private final CompletableFuture<Void> bufferFilteringCompleteFuture = new 
CompletableFuture<>();

Review Comment:
   - `stateConsumedFuture` is the original `CompletableFuture`, it will be 
completed when all recovered input and output buffers are consumed.
   - `bufferFilteringCompleteFuture` is the new `CompletableFuture`, it will be 
completed once all  all recovered input and output buffers are filtered.
   
   So `bufferFilteringCompleteFuture` will be completed soon, it does not need 
to wait for any data processing. 
   - After all `bufferFilteringCompleteFuture` are completed, the task will be 
switched from initialization to running
   - After all tasks are running, the checkpoint can be handled properly.
   
   The current PR significantly reduces the duration of the initialization 
phase. Generally, this process is very fast since do not need to wait for any 
data processing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to