zhuzhurk commented on code in PR #24415:
URL: https://github.com/apache/flink/pull/24415#discussion_r1538868578


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsBatchSnapshot.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An identifier interface for {@link SplitEnumerator}, which is used to 
support taking snapshot in
+ * no-checkpoint/batch scenarios (for example to support job recovery in batch 
jobs). Once a {@link
+ * SplitEnumerator} implements this interface, its {@link 
SplitEnumerator#snapshotState} method
+ * needs to accept -1 (NO_CHECKPOINT) as the argument.
+ */
+@PublicEvolving
+public interface SupportsBatchSnapshot {}

Review Comment:
   -> A decorative interface {@link SplitEnumerator}. Implementing it means the 
split enumerator {@link SplitEnumerator#snapshotState} method supports taking 
snapshot in batch processing scenarios. In such scenarios, the checkpointId 
will always be -1.
   
   I think `-1` does not mean `no-checkpoint` in this case.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -437,9 +437,8 @@ private void setupSubtaskGateway(final SubtaskAccess sta) {
         final SubtaskGatewayImpl gateway =
                 new SubtaskGatewayImpl(sta, mainThreadExecutor, 
unconfirmedEvents);
 
-        // When concurrent execution attempts is supported, the checkpoint 
must have been disabled.
-        // Thus, we don't need to maintain subtaskGatewayMap
-        if (!context.isConcurrentExecutionAttemptsSupported()) {
+        // We don't need to maintain subtaskGatewayMap when checkpoint 
coordinator is null.
+        if (context.getCheckpointCoordinator() != null) {
             subtaskGatewayMap.put(gateway.getSubtask(), gateway);

Review Comment:
   The comments of the field `subtaskGatewayMap` is not updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java:
##########
@@ -160,6 +160,17 @@ public void resetToCheckpoint(final long checkpointId, 
@Nullable final byte[] ch
                 });
     }
 
+    @Override
+    public boolean supportsBatchSnapshot() {
+        try {
+            return getInternalCoordinator().supportsBatchSnapshot();
+        } catch (Exception e) {
+            String msg = "Could not get internal coordinator";
+            LOG.error(msg, e);
+            throw new RuntimeException(msg, e);
+        }

Review Comment:
   `getInternalCoordinator()` can be heavy and block the main thread. 
   Given that this method should not trigger any actions on the coordinator, 
maybe we can directly check `coordinator.internalCoordinator` that it is not 
null and invoke `internalCoordinator.supportsBatchSnapshot()`. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java:
##########
@@ -53,4 +62,62 @@ static byte[] readBytes(DataInputStream in, int size) throws 
IOException {
         in.readFully(bytes);
         return bytes;
     }
+
+    /** Serialize the assignment by checkpoint ids. */
+    static <SplitT> byte[] writeAssignments(
+            Map<Integer, LinkedHashSet<SplitT>> assignments,
+            SimpleVersionedSerializer<SplitT> splitSerializer)
+            throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
+            // SplitSerializer version.

Review Comment:
   useless comment



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java:
##########
@@ -65,6 +69,33 @@ public void onCheckpoint(long checkpointId) throws Exception 
{
         uncheckpointedAssignments = new HashMap<>();
     }
 
+    /**
+     * Take a snapshot of the split assignments.
+     *
+     * @param checkpointId the id of the ongoing checkpoint
+     */
+    public byte[] snapshotState(
+            long checkpointId, SimpleVersionedSerializer<SplitT> 
splitSerializer) throws Exception {
+        checkState(checkpointId == NO_CHECKPOINT);
+        return SourceCoordinatorSerdeUtils.writeAssignments(
+                uncheckpointedAssignments, splitSerializer);
+    }
+
+    /**
+     * Restore the state of the SplitAssignmentTracker.
+     *
+     * @param splitSerializer The serializer of the splits.
+     * @param assignmentData The state of the SplitAssignmentTracker.
+     * @throws Exception when the state deserialization fails.
+     */
+    public void restoreState(
+            SimpleVersionedSerializer<SplitT> splitSerializer, byte[] 
assignmentData)
+            throws Exception {
+        // Read the split assignments by checkpoint id.

Review Comment:
   The comment is outdated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java:
##########
@@ -53,4 +62,62 @@ static byte[] readBytes(DataInputStream in, int size) throws 
IOException {
         in.readFully(bytes);
         return bytes;
     }
+
+    /** Serialize the assignment by checkpoint ids. */

Review Comment:
   The comment is outdated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java:
##########
@@ -234,6 +234,16 @@ default void notifyCheckpointAborted(long checkpointId) {}
      */
     void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway 
gateway);
 
+    /**
+     * Whether the operator coordinator supports taking snapshot in 
no-checkpoint/batch scenarios.
+     * If it returns true, it's {@link 
OperatorCoordinator#checkpointCoordinator} and {@link
+     * OperatorCoordinator#resetToCheckpoint} methods need to accept {@link
+     * OperatorCoordinator#NO_CHECKPOINT} as the value of checkpoint id. 
Otherwise, returns false.
+     */
+    default boolean supportsBatchSnapshot() {

Review Comment:
   -> Whether the operator coordinator supports taking snapshot in 
no-checkpoint/batch scenarios. If it returns true, the {@link 
OperatorCoordinator#checkpointCoordinator} and {@link 
OperatorCoordinator#resetToCheckpoint} methods supports taking snapshot and 
restoring from a snapshot in batch processing scenarios. In such scenarios, the 
checkpointId will always be -1.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -314,7 +314,8 @@ private void checkpointCoordinatorInternal(
                         (success, failure) -> {
                             if (failure != null) {
                                 result.completeExceptionally(failure);
-                            } else if (closeGateways(checkpointId)) {
+                            } else if (checkpointId == 
OperatorCoordinator.NO_CHECKPOINT

Review Comment:
   It's a bit weird to say `NO_CHECKPOINT` and the description of it is not 
right in batch scenarios. I prefer to introduce a new const 
`OperatorCoordinator.BATCH_CHECKPOINT_ID` which is also -1.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java:
##########
@@ -574,8 +574,7 @@ public boolean hasFinishedTasks() {
                                 },
                                 IteratorChain::new,
                                 false),
-                        new CheckpointStatsTracker(
-                                1, new 
WatermarkStrategyTest.DummyMetricGroup(), jobId)));
+                        new CheckpointStatsTracker(1, new 
UnregisteredMetricsGroup(), jobId)));

Review Comment:
   This change looks good to me. Just wondering what would happen without this 
change?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -185,11 +185,6 @@ void markForCheckpoint(long checkpointId) {
         if (checkpointId > latestAttemptedCheckpointId) {
             currentMarkedCheckpointIds.add(checkpointId);
             latestAttemptedCheckpointId = checkpointId;
-        } else {

Review Comment:
   Maybe `else if (checkpointId != BATCH_CHECKPOINT_ID)`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java:
##########
@@ -53,4 +62,62 @@ static byte[] readBytes(DataInputStream in, int size) throws 
IOException {
         in.readFully(bytes);
         return bytes;
     }
+
+    /** Serialize the assignment by checkpoint ids. */
+    static <SplitT> byte[] writeAssignments(

Review Comment:
   I prefer to name the two methods as `serializeAssignments` and 
`deserializeAssignments`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java:
##########
@@ -65,6 +69,33 @@ public void onCheckpoint(long checkpointId) throws Exception 
{
         uncheckpointedAssignments = new HashMap<>();
     }
 
+    /**
+     * Take a snapshot of the split assignments.
+     *
+     * @param checkpointId the id of the ongoing checkpoint
+     */
+    public byte[] snapshotState(
+            long checkpointId, SimpleVersionedSerializer<SplitT> 
splitSerializer) throws Exception {
+        checkState(checkpointId == NO_CHECKPOINT);

Review Comment:
   Looks to me the `checkpointId` param is not needed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java:
##########
@@ -108,15 +105,15 @@ public void testOnCheckpointComplete() throws Exception {
 
         // Complete the first checkpoint.
         tracker.onCheckpointComplete(checkpointId1);
-        assertNull(tracker.assignmentsByCheckpointId(checkpointId1));
+        assertThat(tracker.assignmentsByCheckpointId(checkpointId1)).isNull();
         verifyAssignment(
                 Arrays.asList("3"), 
tracker.assignmentsByCheckpointId(checkpointId2).get(0));
         verifyAssignment(
                 Arrays.asList("4", "5"), 
tracker.assignmentsByCheckpointId(checkpointId2).get(1));
     }
 
     @Test
-    public void testGetAndRemoveUncheckpointedAssignment() throws Exception {
+    void testGetAndRemoveUncheckpointedAssignment() throws Exception {

Review Comment:
   There is one more test case below which is not fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to