[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks
lindong28 commented on code in PR #20275: URL: https://github.com/apache/flink/pull/20275#discussion_r937419550 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java: ## @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.ListAccumulator; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test case that validates the exactly-once mechanism for operator events sent around + * checkpoint. This class is an extension to {@link CoordinatorEventsExactlyOnceITCase}, further + * verifying the exactly-once semantics of events in the following conditions: + * + * Stream operator recipient + * + * In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on verifying the + * correctness of operator coordinator's behavior. It uses a custom {@link AbstractInvokable} + * subclass that mocks the behavior of coordinator's recipients. This test class uses actual stream + * operators as the recipient of coordinator events, verifying that stream operators can correctly + * handle received operator events and inform coordinators of completing checkpoint. + * + * Non-source stream task + * + * In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are executed independently + * of each other. They do not have the upstream-downstream relationship as operators usually do in a + * streaming job, and thus both of them are treated as source tasks. This test class further + * verifies situations when the tested operators are not sources, which means when checkpoint + * barriers are injected into sources, these operators may not have started checkpoint yet. + * + * Unaligned checkpoint + * + * This class tests both aligned and unaligned checkpoints to verify that the correctness of the + * event delivery behavior around checkpoint is not affected by this condition. + * + * Non-global failure + * + * In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the
[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks
lindong28 commented on code in PR #20275: URL: https://github.com/apache/flink/pull/20275#discussion_r937256425 ## flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java: ## @@ -378,6 +373,10 @@ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEven checkState(!workLoopRunning); checkState(subtaskGateway != null); +if (((StartEvent) event).lastValue >= 0) { +nextNumber = ((StartEvent) event).lastValue + 1; Review Comment: Suppose the job starts cleanly instead of from a global failover, it seems that `nextNumber` not explicitly initialized before it is first used, right? ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java: ## @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.ListAccumulator; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test case that validates the exactly-once mechanism for operator events sent around + * checkpoint. This class is an extension to {@link CoordinatorEventsExactlyOnceITCase}, further + * verifying the exactly-once semantics of events in the following conditions: + * + * Stream operator recipient + * + * In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on verifying the + * correctness of operator coordinator's behavior. It uses a custom {@link AbstractInvokable} + * subclass that mocks the behavior of coordinator's recipients. This test class uses actual stream + * operators as the recipient of coordinator events, verifying that stream operators can correctly + * handle received operator events and inform coordinators of completing checkpoint. + * + * Non-source stream task + * + * In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are executed independently + * of each other. They
[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks
lindong28 commented on code in PR #20275: URL: https://github.com/apache/flink/pull/20275#discussion_r936161829 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java: ## @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.ListAccumulator; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test case that validates the exactly-once mechanism for operator events sent around + * checkpoint. This class is an extension to {@link CoordinatorEventsExactlyOnceITCase}, further + * verifying the exactly-once semantics of events in the following conditions: + * + * Stream operator recipient + * + * In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on verifying the + * correctness of operator coordinator's behavior. It uses a custom {@link AbstractInvokable} + * subclass that mocks the behavior of coordinator's recipients. This test class uses actual stream + * operators as the recipient of coordinator events, verifying that stream operators can correctly + * handle received operator events and inform coordinators of completing checkpoint. + * + * Non-source stream task + * + * In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are executed independently + * of each other. They do not have the upstream-downstream relationship as operators usually do in a + * streaming job, and thus both of them are treated as source tasks. This test class further + * verifies situations when the tested operators are not sources, which means when checkpoint + * barriers are injected into sources, these operators may not have started checkpoint yet. + * + * Unaligned checkpoint + * + * This class tests both aligned and unaligned checkpoints to verify that the correctness of the + * event delivery behavior around checkpoint is not
[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks
lindong28 commented on code in PR #20275: URL: https://github.com/apache/flink/pull/20275#discussion_r934612070 ## flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java: ## @@ -111,30 +108,21 @@ * 2. Exactly-once alignment between multiple Coordinators * * After a coordinator completed its checkpoint future, all events sent after that must be held - * back until the checkpoint barriers have been sent to the sources. That is because from the - * coordinator's perspective, the events are after the checkpoint, so they must also be after the - * checkpoint from the source task's perspective. + * back until its subtasks completed their checkpoint. That is because from the coordinator's + * perspective, the events are after the checkpoint, so they must also be after the checkpoint from + * the subtask's perspective. * * When multiple coordinators exist, there are time spans during which some coordinators finished * their checkpoints, but others did not yet, and hence the source checkpoint barriers are not yet * injected (that happens only once all coordinators are done with their checkpoint). The events - * from the earlier coordinators must be blocked until all coordinators finished their checkpoints - * and the source checkpoint barriers are injected. - * - * In the example below, the events {@code c & d} must be held back until after the barrier - * injection. - * - * - * Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . |barrier| . e . f - * Coordinator two events: => . . x . . |trigger| . . . . . . . . . .|complete||barrier| . . y . . z - * + * from the earlier coordinators must be blocked until all coordinators finished their checkpoints , Review Comment: `checkpoints ,` -> `checkpoints,` And `finished` seems to be inconsistent with `complete` in the next line. ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java: ## @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import
[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks
lindong28 commented on code in PR #20275: URL: https://github.com/apache/flink/pull/20275#discussion_r932818606 ## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ## @@ -121,7 +118,15 @@ private final OperatorID operatorId; private final LazyInitializedCoordinatorContext context; private final SubtaskAccess.SubtaskAccessFactory taskAccesses; -private final OperatorEventValve eventValve; + +/** + * A map that manages subtask gateways. It is used to control the opening/closing of each + * gateway during checkpoint. As checkpoints only work when concurrent execution attempt is Review Comment: The following statement might be more explicit: `As checkpoints only work when concurrent execution attempt is ...` -> `This map should only be read when concurrent execution attempt is disabled. Note that concurrent execution attempt is currently guaranteed to be disabled when checkpoint is enabled.` ## flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java: ## @@ -228,4 +234,44 @@ public void triggerTaskFailover(Throwable cause) { // ignore this in the tests } } + +private static class NoMainThreadCheckComponentMainThreadExecutor Review Comment: Can you add comments for this class similar to how other `ComponentMainThreadExecutor` subclasses are documented? ## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java: ## @@ -91,27 +130,133 @@ public CompletableFuture sendEvent(OperatorEvent evt) { new FlinkException(msg, failure))); } }, -sendingExecutor); +mainThreadExecutor); -sendingExecutor.execute( +mainThreadExecutor.execute( () -> { -sender.sendEvent(sendAction, sendResult); +sendEventInternal(sendAction, sendResult); incompleteFuturesTracker.trackFutureWhileIncomplete(result); }); + return result; } -@Override -public ExecutionAttemptID getExecution() { -return subtaskAccess.currentAttempt(); +private void sendEventInternal( +Callable> sendAction, +CompletableFuture result) { +checkRunsInMainThread(); + +if (isClosed) { +blockedEvents.add(new BlockedEvent(sendAction, result)); +} else { +callSendAction(sendAction, result); +} } -@Override -public int getSubtask() { -return subtaskAccess.getSubtaskIndex(); +private void callSendAction( +Callable> sendAction, +CompletableFuture result) { +try { +final CompletableFuture sendResult = sendAction.call(); +FutureUtils.forward(sendResult, result); +} catch (Throwable t) { +ExceptionUtils.rethrowIfFatalError(t); +result.completeExceptionally(t); +} } -private boolean isReady() { -return subtaskAccess.hasSwitchedToRunning().isDone(); +/** + * Marks the gateway for the next checkpoint. This remembers the checkpoint ID and will only + * allow closing the gateway for this specific checkpoint. + * + * This is the gateway's mechanism to detect situations where multiple coordinator + * checkpoints would be attempted overlapping, which is currently not supported (the gateway + * doesn't keep a list of events blocked per checkpoint). It also helps to identify situations + * where the checkpoint was aborted even before the gateway was closed (by finding out that the + * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}. + */ +void markForCheckpoint(long checkpointId) { +checkRunsInMainThread(); + +if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) { +throw new IllegalStateException( +String.format( +"Cannot mark for checkpoint %d, already marked for checkpoint %d", +checkpointId, currentCheckpointId)); +} + +if (checkpointId > lastCheckpointId) { +currentCheckpointId = checkpointId; +lastCheckpointId = checkpointId; +} else { +throw new IllegalStateException( +String.format( +"Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d", +lastCheckpointId, checkpointId)); +} +} + +/** + * Closes the gateway. All events sent through this gateway are blocked until the gateway is + * re-opened. If
[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks
lindong28 commented on code in PR #20275: URL: https://github.com/apache/flink/pull/20275#discussion_r930593185 ## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ## @@ -308,6 +337,24 @@ private void checkpointCoordinatorInternal( } } +private boolean closeGateways( +final long checkpointId, final Set subtasksToCheckpoint) { +boolean hasCloseableGateway = false; +for (int subtask : subtasksToCheckpoint) { +SubtaskGatewayImpl gateway = subtaskGatewayMap.get(subtask); +if (!gateway.tryCloseGateway(checkpointId)) { Review Comment: If we do want to capture that bug, how about we consistently call `tryCloseGateway()` for every subtasks and check the condition at the end of the loop, instead of stopping at the first subtask whose `tryCloseGateway()` return false? Otherwise, suppose `subtask_1.tryCloseGateway() == false` and `subtask_2.tryCloseGateway() == true`, the method will not throw Exception. This makes the method's behavior harder to understand. ## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/AcknowledgeCheckpointEvent.java: ## @@ -18,20 +18,16 @@ package org.apache.flink.runtime.operators.coordination; -import org.apache.flink.runtime.messages.Acknowledge; - -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; +/** + * An {@link OperatorEvent} sent from a subtask to its {@link OperatorCoordinator} to signal that + * the checkpoint of an individual task is completed. + */ +public class AcknowledgeCheckpointEvent implements OperatorEvent { -/** Simple interface for a component that takes and sends events. */ -@FunctionalInterface -interface EventSender { +/** The ID of the checkpoint that this event is related to. */ +final long checkpointId; Review Comment: Is there any existing example where we make a variable protected and do not provide accessor method for this variable? If no, it seems better to keep code style consistent with existing code. ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsExactlyOnceTest.java: ## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import
[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks
lindong28 commented on code in PR #20275: URL: https://github.com/apache/flink/pull/20275#discussion_r927315690 ## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ## @@ -284,22 +311,24 @@ private void checkpointCoordinatorInternal( (success, failure) -> { if (failure != null) { result.completeExceptionally(failure); -} else if (eventValve.tryShutValve(checkpointId)) { +} else if (closeGateways(checkpointId, subtasksToCheckpoint)) { completeCheckpointOnceEventsAreDone(checkpointId, result, success); } else { -// if we cannot shut the valve, this means the checkpoint +// if we cannot close the gateway, this means the checkpoint // has been aborted before, so the future is already // completed exceptionally. but we try to complete it here // again, just in case, as a safety net. result.completeExceptionally( -new FlinkException("Cannot shut event valve")); +new FlinkException("Cannot close gateway")); } return null; }, mainThreadExecutor)); try { -eventValve.markForCheckpoint(checkpointId); +for (int subtask : subtasksToCheckpoint) { +subtaskGatewayMap.get(subtask).markForCheckpoint(checkpointId); Review Comment: Would it be simpler to call `markForCheckpoint()` for every subtask instead of additionally passing `subtasksToCheckpoint` as a function input parameter? ## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ## @@ -120,7 +118,7 @@ private final OperatorID operatorId; private final LazyInitializedCoordinatorContext context; private final SubtaskAccess.SubtaskAccessFactory taskAccesses; -private final OperatorEventValve eventValve; +private final Map subtaskGatewayMap = new HashMap<>(); Review Comment: Should we instantiate this variable in the constructor for consistency with `unconfirmedEvents`? ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java: ## @@ -59,15 +66,25 @@ public static CompletableFuture triggerCoordinatorCheckpoin public static CompletableFuture triggerAllCoordinatorCheckpoints( final Collection coordinators, -final long checkpointId) +final PendingCheckpoint checkpoint) throws Exception { final Collection> individualSnapshots = new ArrayList<>(coordinators.size()); for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) { +Set subtasksToCheckpoint = new HashSet<>(); Review Comment: Is there any conceptual difference between `tasksToWaitFor` and `subtasksToCheckpoint`? If no, it is probably more readable to use a name that is consistent with `CheckpointPlan::getTasksToWaitFor()`. ## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java: ## @@ -18,43 +18,82 @@ package org.apache.flink.runtime.operators.coordination; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker; import org.apache.flink.runtime.util.Runnables; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.concurrent.FutureUtils; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; /** * Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface that access to * subtasks for status and event sending via {@link SubtaskAccess}. + * + * Instances of this class can be temporarily closed, blocking events from going through, + * buffering them, and releasing them later. It is used for "alignment" of operator event streams + * with checkpoint barrier injection, similar to how the input channels are aligned during a common + * checkpoint. + * + * The methods on