lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r923461372


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CloseableSubtaskGateway.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An {@link 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway}
+ * that can be temporarily closed, blocking events from going through, 
buffering them, and releasing
+ * them later. It is used for "alignment" of operator event streams with 
checkpoint barrier
+ * injection, similar to how the input channels are aligned during a common 
checkpoint.
+ *
+ * <p>This class is NOT thread safe, but assumed to be used in a single 
threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox 
components like RPC
+ * components) via {@link 
#setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
+ */
+class CloseableSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
+
+    /** The wrapped gateway that actually performs the event-sending 
operation. */
+    private final OperatorCoordinator.SubtaskGateway innerGateway;
+
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+    private long currentCheckpointId;
+
+    private long lastCheckpointId;
+
+    private boolean isClosed;
+
+    @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
+
+    CloseableSubtaskGateway(OperatorCoordinator.SubtaskGateway innerGateway) {
+        this.innerGateway = innerGateway;
+        this.currentCheckpointId = NO_CHECKPOINT;
+        this.lastCheckpointId = Long.MIN_VALUE;
+    }
+
+    void setMainThreadExecutorForValidation(ComponentMainThreadExecutor 
mainThreadExecutor) {
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
+    /**
+     * Marks the gateway for the next checkpoint. This remembers the 
checkpoint ID and will only
+     * allow closing the gateway for this specific checkpoint.
+     *
+     * <p>This is the gateway's mechanism to detect situations where multiple 
coordinator
+     * checkpoints would be attempted overlapping, which is currently not 
supported (the gateway
+     * doesn't keep a list of events blocked per checkpoint). It also helps to 
identify situations
+     * where the checkpoint was aborted even before the gateway was closed (by 
finding out that the
+     * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+     */
+    void markForCheckpoint(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != 
checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for 
checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId 
= %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
+        }
+    }
+
+    /**
+     * Closes the gateway. All events sent through this gateway are blocked 
until the gateway is
+     * re-opened. If the gateway is already closed, this does nothing.
+     *
+     * @return True if the gateway is closed, false if the checkpointId is 
incorrect.
+     */
+    boolean tryCloseGateway(long checkpointId) {

Review Comment:
   According to the Java doc of this class, `This class is NOT thread safe, but 
assumed to be used in a single threaded context`. Should we also invoke 
`checkRunsInMainThread()` in methods such as `tryCloseGateway()` and 
`sendEvent()`?
   
   I assume we want to make sure tha `isClosed` and `blockedEvents` won't be 
accessed concurrently.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -236,7 +240,20 @@ public void notifyCheckpointComplete(long checkpointId) {
         // checkpoint coordinator time thread.
         // we can remove the delegation once the checkpoint coordinator runs 
fully in the
         // scheduler's main thread executor
-        mainThreadExecutor.execute(() -> 
coordinator.notifyCheckpointComplete(checkpointId));
+        mainThreadExecutor.execute(
+                () -> {
+                    coordinator.notifyCheckpointComplete(checkpointId);
+                    for (Map.Entry<Integer, CloseableSubtaskGateway> entry :
+                            subtaskGatewayMap.entrySet()) {
+                        if (entry.getValue().isClosed()) {
+                            LOG.warn(

Review Comment:
   Is there any case that this would happen? If no, would it be simpler to 
throw exception?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CloseableSubtaskGateway.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An {@link 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway}
+ * that can be temporarily closed, blocking events from going through, 
buffering them, and releasing
+ * them later. It is used for "alignment" of operator event streams with 
checkpoint barrier
+ * injection, similar to how the input channels are aligned during a common 
checkpoint.
+ *
+ * <p>This class is NOT thread safe, but assumed to be used in a single 
threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox 
components like RPC
+ * components) via {@link 
#setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
+ */
+class CloseableSubtaskGateway implements OperatorCoordinator.SubtaskGateway {

Review Comment:
   Typically `CloseableXXX` refers to classes that implement the 
`java.io.Closeable` interface. Could we rename this class to reduce confusion?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -277,29 +300,33 @@ private void checkpointCoordinatorInternal(
             final long checkpointId, final CompletableFuture<byte[]> result) {
         mainThreadExecutor.assertRunningInMainThread();
 
-        final CompletableFuture<byte[]> coordinatorCheckpoint = new 
CompletableFuture<>();
-
-        FutureUtils.assertNoException(
-                coordinatorCheckpoint.handleAsync(
-                        (success, failure) -> {
-                            if (failure != null) {
-                                result.completeExceptionally(failure);
-                            } else if (eventValve.tryShutValve(checkpointId)) {
-                                
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
-                            } else {
-                                // if we cannot shut the valve, this means the 
checkpoint
-                                // has been aborted before, so the future is 
already
-                                // completed exceptionally. but we try to 
complete it here
-                                // again, just in case, as a safety net.
-                                result.completeExceptionally(
-                                        new FlinkException("Cannot shut event 
valve"));
-                            }
-                            return null;
-                        },
-                        mainThreadExecutor));
-
         try {
-            eventValve.markForCheckpoint(checkpointId);
+            subtaskGatewayMap.values().forEach(x -> 
x.markForCheckpoint(checkpointId));
+
+            final CompletableFuture<byte[]> coordinatorCheckpoint = new 
CompletableFuture<>();
+
+            coordinatorCheckpoint.whenComplete(
+                    (success, failure) -> {
+                        if (failure != null) {
+                            result.completeExceptionally(failure);
+                        } else {
+                            closeGateways(checkpointId, result);

Review Comment:
   `coordinatorCheckpoint.whenComplete(...)` does not guarantee the provided 
`action` is invoked by the current thread. In order to make sure 
`closeGateways()` can still be executed using `mainThreadExecutor`, it seems 
necessary to use `handleAsync(...)` as did before this PR.
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CloseableSubtaskGateway.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An {@link 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway}
+ * that can be temporarily closed, blocking events from going through, 
buffering them, and releasing
+ * them later. It is used for "alignment" of operator event streams with 
checkpoint barrier
+ * injection, similar to how the input channels are aligned during a common 
checkpoint.
+ *
+ * <p>This class is NOT thread safe, but assumed to be used in a single 
threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox 
components like RPC
+ * components) via {@link 
#setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
+ */
+class CloseableSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
+
+    /** The wrapped gateway that actually performs the event-sending 
operation. */
+    private final OperatorCoordinator.SubtaskGateway innerGateway;
+
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+    private long currentCheckpointId;
+
+    private long lastCheckpointId;
+
+    private boolean isClosed;
+
+    @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
+
+    CloseableSubtaskGateway(OperatorCoordinator.SubtaskGateway innerGateway) {
+        this.innerGateway = innerGateway;
+        this.currentCheckpointId = NO_CHECKPOINT;
+        this.lastCheckpointId = Long.MIN_VALUE;
+    }
+
+    void setMainThreadExecutorForValidation(ComponentMainThreadExecutor 
mainThreadExecutor) {
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
+    /**
+     * Marks the gateway for the next checkpoint. This remembers the 
checkpoint ID and will only
+     * allow closing the gateway for this specific checkpoint.
+     *
+     * <p>This is the gateway's mechanism to detect situations where multiple 
coordinator
+     * checkpoints would be attempted overlapping, which is currently not 
supported (the gateway
+     * doesn't keep a list of events blocked per checkpoint). It also helps to 
identify situations
+     * where the checkpoint was aborted even before the gateway was closed (by 
finding out that the
+     * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+     */
+    void markForCheckpoint(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != 
checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for 
checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId 
= %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
+        }
+    }
+
+    /**
+     * Closes the gateway. All events sent through this gateway are blocked 
until the gateway is
+     * re-opened. If the gateway is already closed, this does nothing.
+     *
+     * @return True if the gateway is closed, false if the checkpointId is 
incorrect.
+     */
+    boolean tryCloseGateway(long checkpointId) {
+        if (checkpointId == currentCheckpointId) {
+            isClosed = true;
+            return true;
+        }
+        return false;
+    }
+
+    boolean isClosed() {
+        return isClosed;
+    }
+
+    void openGatewayAndUnmarkCheckpoint(long expectedCheckpointId) {
+        checkRunsInMainThread();
+
+        if (expectedCheckpointId != currentCheckpointId) {
+            return;
+        }
+        openGatewayAndUnmarkCheckpoint();
+    }
+
+    /** Opens the gateway, releasing all buffered events. */
+    void openGatewayAndUnmarkCheckpoint() {

Review Comment:
   Would it be more readable to mark this method as either public or private? 
Same for other methods.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CloseableSubtaskGateway.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An {@link 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway}
+ * that can be temporarily closed, blocking events from going through, 
buffering them, and releasing
+ * them later. It is used for "alignment" of operator event streams with 
checkpoint barrier
+ * injection, similar to how the input channels are aligned during a common 
checkpoint.
+ *
+ * <p>This class is NOT thread safe, but assumed to be used in a single 
threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox 
components like RPC
+ * components) via {@link 
#setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
+ */
+class CloseableSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
+
+    /** The wrapped gateway that actually performs the event-sending 
operation. */
+    private final OperatorCoordinator.SubtaskGateway innerGateway;
+
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+    private long currentCheckpointId;
+
+    private long lastCheckpointId;
+
+    private boolean isClosed;
+
+    @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
+
+    CloseableSubtaskGateway(OperatorCoordinator.SubtaskGateway innerGateway) {
+        this.innerGateway = innerGateway;
+        this.currentCheckpointId = NO_CHECKPOINT;
+        this.lastCheckpointId = Long.MIN_VALUE;
+    }
+
+    void setMainThreadExecutorForValidation(ComponentMainThreadExecutor 
mainThreadExecutor) {
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
+    /**
+     * Marks the gateway for the next checkpoint. This remembers the 
checkpoint ID and will only
+     * allow closing the gateway for this specific checkpoint.
+     *
+     * <p>This is the gateway's mechanism to detect situations where multiple 
coordinator
+     * checkpoints would be attempted overlapping, which is currently not 
supported (the gateway
+     * doesn't keep a list of events blocked per checkpoint). It also helps to 
identify situations
+     * where the checkpoint was aborted even before the gateway was closed (by 
finding out that the
+     * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+     */
+    void markForCheckpoint(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != 
checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for 
checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId 
= %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
+        }
+    }
+
+    /**
+     * Closes the gateway. All events sent through this gateway are blocked 
until the gateway is
+     * re-opened. If the gateway is already closed, this does nothing.
+     *
+     * @return True if the gateway is closed, false if the checkpointId is 
incorrect.
+     */
+    boolean tryCloseGateway(long checkpointId) {
+        if (checkpointId == currentCheckpointId) {
+            isClosed = true;
+            return true;
+        }
+        return false;
+    }
+
+    boolean isClosed() {
+        return isClosed;
+    }
+
+    void openGatewayAndUnmarkCheckpoint(long expectedCheckpointId) {
+        checkRunsInMainThread();
+
+        if (expectedCheckpointId != currentCheckpointId) {
+            return;
+        }
+        openGatewayAndUnmarkCheckpoint();
+    }
+
+    /** Opens the gateway, releasing all buffered events. */
+    void openGatewayAndUnmarkCheckpoint() {
+        checkRunsInMainThread();
+
+        currentCheckpointId = NO_CHECKPOINT;
+        if (!isClosed) {
+            return;
+        }
+
+        for (BlockedEvent blockedEvent : blockedEvents) {
+            CompletableFuture<Acknowledge> result =
+                    innerGateway.sendEvent(blockedEvent.operatorEvent);
+            FutureUtils.forward(result, blockedEvent.future);
+        }
+        blockedEvents.clear();
+
+        isClosed = false;
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt) {
+        if (isClosed) {
+            CompletableFuture<Acknowledge> sendResult = new 
CompletableFuture<>();

Review Comment:
   Suppose the given operator event can not be serialized and the gateway is 
closed, the exception will be added to the future returned to the caller. 
   
   This behavior is inconsistent with the case when the gateway is open. Also 
note that many callers currently do not check the future returned by this 
method. This means that an exception that can be caught previously might be 
missed now.
   
   Would it be safer to keep the previous behavior by always throwing the 
deserialization exception directly to the caller?
   
   Same for the exception that is thrown when the gateway is not read.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CloseableSubtaskGateway.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An {@link 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway}
+ * that can be temporarily closed, blocking events from going through, 
buffering them, and releasing
+ * them later. It is used for "alignment" of operator event streams with 
checkpoint barrier
+ * injection, similar to how the input channels are aligned during a common 
checkpoint.
+ *
+ * <p>This class is NOT thread safe, but assumed to be used in a single 
threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox 
components like RPC
+ * components) via {@link 
#setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
+ */
+class CloseableSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
+
+    /** The wrapped gateway that actually performs the event-sending 
operation. */
+    private final OperatorCoordinator.SubtaskGateway innerGateway;
+
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+    private long currentCheckpointId;
+
+    private long lastCheckpointId;
+
+    private boolean isClosed;
+
+    @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
+
+    CloseableSubtaskGateway(OperatorCoordinator.SubtaskGateway innerGateway) {
+        this.innerGateway = innerGateway;
+        this.currentCheckpointId = NO_CHECKPOINT;
+        this.lastCheckpointId = Long.MIN_VALUE;
+    }
+
+    void setMainThreadExecutorForValidation(ComponentMainThreadExecutor 
mainThreadExecutor) {
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
+    /**
+     * Marks the gateway for the next checkpoint. This remembers the 
checkpoint ID and will only
+     * allow closing the gateway for this specific checkpoint.
+     *
+     * <p>This is the gateway's mechanism to detect situations where multiple 
coordinator
+     * checkpoints would be attempted overlapping, which is currently not 
supported (the gateway
+     * doesn't keep a list of events blocked per checkpoint). It also helps to 
identify situations
+     * where the checkpoint was aborted even before the gateway was closed (by 
finding out that the
+     * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+     */
+    void markForCheckpoint(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != 
checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for 
checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId 
= %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
+        }
+    }
+
+    /**
+     * Closes the gateway. All events sent through this gateway are blocked 
until the gateway is
+     * re-opened. If the gateway is already closed, this does nothing.
+     *
+     * @return True if the gateway is closed, false if the checkpointId is 
incorrect.
+     */
+    boolean tryCloseGateway(long checkpointId) {
+        if (checkpointId == currentCheckpointId) {
+            isClosed = true;
+            return true;
+        }
+        return false;
+    }
+
+    boolean isClosed() {
+        return isClosed;
+    }
+
+    void openGatewayAndUnmarkCheckpoint(long expectedCheckpointId) {

Review Comment:
   Do we expect the communication from coordinator to operator to always 
support `openGatewayAndUnmarkCheckpoint`? 
   
   If yes, would it be more reasonable o put such methods in the 
`SubtaskGateway` and merge `CloseableSubtaskGateway` into `SubtaskGatewayImpl`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to