[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks

2022-08-04 Thread GitBox


lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r937419550


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * Stream operator recipient
+ *
+ * In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * Non-source stream task
+ *
+ * In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * Unaligned checkpoint
+ *
+ * This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * Non-global failure
+ *
+ * In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the 

[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks

2022-08-03 Thread GitBox


lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r937256425


##
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java:
##
@@ -378,6 +373,10 @@ public void handleEventFromOperator(int subtask, int 
attemptNumber, OperatorEven
 checkState(!workLoopRunning);
 checkState(subtaskGateway != null);
 
+if (((StartEvent) event).lastValue >= 0) {
+nextNumber = ((StartEvent) event).lastValue + 1;

Review Comment:
   Suppose the job starts cleanly instead of from a global failover, it seems 
that `nextNumber` not explicitly initialized before it is first used, right?



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * Stream operator recipient
+ *
+ * In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * Non-source stream task
+ *
+ * In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They 

[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks

2022-08-03 Thread GitBox


lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r936161829


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * Stream operator recipient
+ *
+ * In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * Non-source stream task
+ *
+ * In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * Unaligned checkpoint
+ *
+ * This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not 

[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks

2022-08-01 Thread GitBox


lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r934612070


##
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java:
##
@@ -111,30 +108,21 @@
  * 2. Exactly-once alignment between multiple Coordinators
  *
  * After a coordinator completed its checkpoint future, all events sent 
after that must be held
- * back until the checkpoint barriers have been sent to the sources. That is 
because from the
- * coordinator's perspective, the events are after the checkpoint, so they 
must also be after the
- * checkpoint from the source task's perspective.
+ * back until its subtasks completed their checkpoint. That is because from 
the coordinator's
+ * perspective, the events are after the checkpoint, so they must also be 
after the checkpoint from
+ * the subtask's perspective.
  *
  * When multiple coordinators exist, there are time spans during which some 
coordinators finished
  * their checkpoints, but others did not yet, and hence the source checkpoint 
barriers are not yet
  * injected (that happens only once all coordinators are done with their 
checkpoint). The events
- * from the earlier coordinators must be blocked until all coordinators 
finished their checkpoints
- * and the source checkpoint barriers are injected.
- *
- * In the example below, the events {@code c & d} must be held back until 
after the barrier
- * injection.
- *
- * 
- * Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . 
|barrier| . e . f
- * Coordinator two events: => . . x . . |trigger| . . . . . . . . . 
.|complete||barrier| . . y . . z
- * 
+ * from the earlier coordinators must be blocked until all coordinators 
finished their checkpoints ,

Review Comment:
   `checkpoints ,` -> `checkpoints,`
   
   And `finished` seems to be inconsistent with `complete` in the next line.



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import 

[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks

2022-07-30 Thread GitBox


lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r932818606


##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##
@@ -121,7 +118,15 @@
 private final OperatorID operatorId;
 private final LazyInitializedCoordinatorContext context;
 private final SubtaskAccess.SubtaskAccessFactory taskAccesses;
-private final OperatorEventValve eventValve;
+
+/**
+ * A map that manages subtask gateways. It is used to control the 
opening/closing of each
+ * gateway during checkpoint. As checkpoints only work when concurrent 
execution attempt is

Review Comment:
   The following statement might be more explicit:
   
   `As checkpoints only work when concurrent execution attempt is ...` -> `This 
map should only be read when concurrent execution attempt is disabled. Note 
that concurrent execution attempt is currently guaranteed to be disabled when 
checkpoint is enabled.`



##
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java:
##
@@ -228,4 +234,44 @@ public void triggerTaskFailover(Throwable cause) {
 // ignore this in the tests
 }
 }
+
+private static class NoMainThreadCheckComponentMainThreadExecutor

Review Comment:
   Can you add comments for this class similar to how other 
`ComponentMainThreadExecutor` subclasses are documented?



##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##
@@ -91,27 +130,133 @@ public CompletableFuture 
sendEvent(OperatorEvent evt) {
 new 
FlinkException(msg, failure)));
 }
 },
-sendingExecutor);
+mainThreadExecutor);
 
-sendingExecutor.execute(
+mainThreadExecutor.execute(
 () -> {
-sender.sendEvent(sendAction, sendResult);
+sendEventInternal(sendAction, sendResult);
 
incompleteFuturesTracker.trackFutureWhileIncomplete(result);
 });
+
 return result;
 }
 
-@Override
-public ExecutionAttemptID getExecution() {
-return subtaskAccess.currentAttempt();
+private void sendEventInternal(
+Callable> sendAction,
+CompletableFuture result) {
+checkRunsInMainThread();
+
+if (isClosed) {
+blockedEvents.add(new BlockedEvent(sendAction, result));
+} else {
+callSendAction(sendAction, result);
+}
 }
 
-@Override
-public int getSubtask() {
-return subtaskAccess.getSubtaskIndex();
+private void callSendAction(
+Callable> sendAction,
+CompletableFuture result) {
+try {
+final CompletableFuture sendResult = 
sendAction.call();
+FutureUtils.forward(sendResult, result);
+} catch (Throwable t) {
+ExceptionUtils.rethrowIfFatalError(t);
+result.completeExceptionally(t);
+}
 }
 
-private boolean isReady() {
-return subtaskAccess.hasSwitchedToRunning().isDone();
+/**
+ * Marks the gateway for the next checkpoint. This remembers the 
checkpoint ID and will only
+ * allow closing the gateway for this specific checkpoint.
+ *
+ * This is the gateway's mechanism to detect situations where multiple 
coordinator
+ * checkpoints would be attempted overlapping, which is currently not 
supported (the gateway
+ * doesn't keep a list of events blocked per checkpoint). It also helps to 
identify situations
+ * where the checkpoint was aborted even before the gateway was closed (by 
finding out that the
+ * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+ */
+void markForCheckpoint(long checkpointId) {
+checkRunsInMainThread();
+
+if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != 
checkpointId) {
+throw new IllegalStateException(
+String.format(
+"Cannot mark for checkpoint %d, already marked for 
checkpoint %d",
+checkpointId, currentCheckpointId));
+}
+
+if (checkpointId > lastCheckpointId) {
+currentCheckpointId = checkpointId;
+lastCheckpointId = checkpointId;
+} else {
+throw new IllegalStateException(
+String.format(
+"Regressing checkpoint IDs. Previous checkpointId 
= %d, new checkpointId = %d",
+lastCheckpointId, checkpointId));
+}
+}
+
+/**
+ * Closes the gateway. All events sent through this gateway are blocked 
until the gateway is
+ * re-opened. If 

[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks

2022-07-26 Thread GitBox


lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r930593185


##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##
@@ -308,6 +337,24 @@ private void checkpointCoordinatorInternal(
 }
 }
 
+private boolean closeGateways(
+final long checkpointId, final Set subtasksToCheckpoint) {
+boolean hasCloseableGateway = false;
+for (int subtask : subtasksToCheckpoint) {
+SubtaskGatewayImpl gateway = subtaskGatewayMap.get(subtask);
+if (!gateway.tryCloseGateway(checkpointId)) {

Review Comment:
   If we do want to capture that bug, how about we consistently call 
`tryCloseGateway()` for every subtasks and check the condition at the end of 
the loop, instead of stopping at the first subtask whose `tryCloseGateway()` 
return false?
   
   Otherwise, suppose `subtask_1.tryCloseGateway() == false` and 
`subtask_2.tryCloseGateway() == true`, the method will not throw Exception. 
This makes the method's behavior harder to understand.
   
   
   



##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/AcknowledgeCheckpointEvent.java:
##
@@ -18,20 +18,16 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
-import org.apache.flink.runtime.messages.Acknowledge;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
+/**
+ * An {@link OperatorEvent} sent from a subtask to its {@link 
OperatorCoordinator} to signal that
+ * the checkpoint of an individual task is completed.
+ */
+public class AcknowledgeCheckpointEvent implements OperatorEvent {
 
-/** Simple interface for a component that takes and sends events. */
-@FunctionalInterface
-interface EventSender {
+/** The ID of the checkpoint that this event is related to. */
+final long checkpointId;

Review Comment:
   Is there any existing example where we make a variable protected and do not 
provide accessor method for this variable?
   
   If no, it seems better to keep code style consistent with existing code.



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsExactlyOnceTest.java:
##
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import 

[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks

2022-07-22 Thread GitBox


lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r927315690


##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##
@@ -284,22 +311,24 @@ private void checkpointCoordinatorInternal(
 (success, failure) -> {
 if (failure != null) {
 result.completeExceptionally(failure);
-} else if (eventValve.tryShutValve(checkpointId)) {
+} else if (closeGateways(checkpointId, 
subtasksToCheckpoint)) {
 
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
 } else {
-// if we cannot shut the valve, this means the 
checkpoint
+// if we cannot close the gateway, this means 
the checkpoint
 // has been aborted before, so the future is 
already
 // completed exceptionally. but we try to 
complete it here
 // again, just in case, as a safety net.
 result.completeExceptionally(
-new FlinkException("Cannot shut event 
valve"));
+new FlinkException("Cannot close 
gateway"));
 }
 return null;
 },
 mainThreadExecutor));
 
 try {
-eventValve.markForCheckpoint(checkpointId);
+for (int subtask : subtasksToCheckpoint) {
+subtaskGatewayMap.get(subtask).markForCheckpoint(checkpointId);

Review Comment:
   Would it be simpler to call `markForCheckpoint()` for every subtask instead 
of additionally passing `subtasksToCheckpoint` as a function input parameter?



##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##
@@ -120,7 +118,7 @@
 private final OperatorID operatorId;
 private final LazyInitializedCoordinatorContext context;
 private final SubtaskAccess.SubtaskAccessFactory taskAccesses;
-private final OperatorEventValve eventValve;
+private final Map subtaskGatewayMap = new 
HashMap<>();

Review Comment:
   Should we instantiate this variable in the constructor for consistency with 
`unconfirmedEvents`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java:
##
@@ -59,15 +66,25 @@ public static CompletableFuture 
triggerCoordinatorCheckpoin
 
 public static CompletableFuture 
triggerAllCoordinatorCheckpoints(
 final Collection 
coordinators,
-final long checkpointId)
+final PendingCheckpoint checkpoint)
 throws Exception {
 
 final Collection> 
individualSnapshots =
 new ArrayList<>(coordinators.size());
 
 for (final OperatorCoordinatorCheckpointContext coordinator : 
coordinators) {
+Set subtasksToCheckpoint = new HashSet<>();

Review Comment:
   Is there any conceptual difference between `tasksToWaitFor` and 
`subtasksToCheckpoint`? If no, it is probably more readable to use a name that 
is consistent with `CheckpointPlan::getTasksToWaitFor()`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##
@@ -18,43 +18,82 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import 
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
 import org.apache.flink.runtime.util.Runnables;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 /**
  * Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface 
that access to
  * subtasks for status and event sending via {@link SubtaskAccess}.
+ *
+ * Instances of this class can be temporarily closed, blocking events from 
going through,
+ * buffering them, and releasing them later. It is used for "alignment" of 
operator event streams
+ * with checkpoint barrier injection, similar to how the input channels are 
aligned during a common
+ * checkpoint.
+ *
+ * The methods on