lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r932818606


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -121,7 +118,15 @@
     private final OperatorID operatorId;
     private final LazyInitializedCoordinatorContext context;
     private final SubtaskAccess.SubtaskAccessFactory taskAccesses;
-    private final OperatorEventValve eventValve;
+
+    /**
+     * A map that manages subtask gateways. It is used to control the 
opening/closing of each
+     * gateway during checkpoint. As checkpoints only work when concurrent 
execution attempt is

Review Comment:
   The following statement might be more explicit:
   
   `As checkpoints only work when concurrent execution attempt is ...` -> `This 
map should only be read when concurrent execution attempt is disabled. Note 
that concurrent execution attempt is currently guaranteed to be disabled when 
checkpoint is enabled.`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java:
##########
@@ -228,4 +234,44 @@ public void triggerTaskFailover(Throwable cause) {
             // ignore this in the tests
         }
     }
+
+    private static class NoMainThreadCheckComponentMainThreadExecutor

Review Comment:
   Can you add comments for this class similar to how other 
`ComponentMainThreadExecutor` subclasses are documented?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -91,27 +130,133 @@ public CompletableFuture<Acknowledge> 
sendEvent(OperatorEvent evt) {
                                                         new 
FlinkException(msg, failure)));
                             }
                         },
-                        sendingExecutor);
+                        mainThreadExecutor);
 
-        sendingExecutor.execute(
+        mainThreadExecutor.execute(
                 () -> {
-                    sender.sendEvent(sendAction, sendResult);
+                    sendEventInternal(sendAction, sendResult);
                     
incompleteFuturesTracker.trackFutureWhileIncomplete(result);
                 });
+
         return result;
     }
 
-    @Override
-    public ExecutionAttemptID getExecution() {
-        return subtaskAccess.currentAttempt();
+    private void sendEventInternal(
+            Callable<CompletableFuture<Acknowledge>> sendAction,
+            CompletableFuture<Acknowledge> result) {
+        checkRunsInMainThread();
+
+        if (isClosed) {
+            blockedEvents.add(new BlockedEvent(sendAction, result));
+        } else {
+            callSendAction(sendAction, result);
+        }
     }
 
-    @Override
-    public int getSubtask() {
-        return subtaskAccess.getSubtaskIndex();
+    private void callSendAction(
+            Callable<CompletableFuture<Acknowledge>> sendAction,
+            CompletableFuture<Acknowledge> result) {
+        try {
+            final CompletableFuture<Acknowledge> sendResult = 
sendAction.call();
+            FutureUtils.forward(sendResult, result);
+        } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalError(t);
+            result.completeExceptionally(t);
+        }
     }
 
-    private boolean isReady() {
-        return subtaskAccess.hasSwitchedToRunning().isDone();
+    /**
+     * Marks the gateway for the next checkpoint. This remembers the 
checkpoint ID and will only
+     * allow closing the gateway for this specific checkpoint.
+     *
+     * <p>This is the gateway's mechanism to detect situations where multiple 
coordinator
+     * checkpoints would be attempted overlapping, which is currently not 
supported (the gateway
+     * doesn't keep a list of events blocked per checkpoint). It also helps to 
identify situations
+     * where the checkpoint was aborted even before the gateway was closed (by 
finding out that the
+     * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+     */
+    void markForCheckpoint(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != 
checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for 
checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId 
= %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
+        }
+    }
+
+    /**
+     * Closes the gateway. All events sent through this gateway are blocked 
until the gateway is
+     * re-opened. If the gateway is already closed, this does nothing.
+     *
+     * @return True if the gateway is closed, false if the checkpointId is 
incorrect.
+     */
+    boolean tryCloseGateway(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (checkpointId == currentCheckpointId) {
+            isClosed = true;
+            return true;
+        }
+        return false;
+    }
+
+    boolean isClosed() {
+        checkRunsInMainThread();
+        return isClosed;
+    }
+
+    void openGatewayAndUnmarkCheckpoint(long expectedCheckpointId) {
+        checkRunsInMainThread();
+
+        if (expectedCheckpointId != currentCheckpointId) {

Review Comment:
   Suppose `currentCheckpointId = NO_CHECKPOINT` and `checkpointId = 10`, 
should we still throw exception in this case? If yes, then we need to update 
the condition accordingly.
   
   Also, since the reason for this condition is that not intuitive, could you 
add comments explaining this check? 
   
   



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint.
+ *
+ * <p>This class is an extension to {@link
+ * 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase},
 further
+ * verifying the exactly-once semantics of events when the flink job is 
constructed using actual
+ * stream operators and verifying the correctness of the behavior of stream 
operators.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+        extends CoordinatorEventsExactlyOnceITCase {
+    private static final AtomicBoolean shouldCloseSource = new 
AtomicBoolean(false);
+
+    private static final int numEvents = 100;
+
+    private static final int delay = 1;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource.set(false);
+    }
+
+    @Test
+    public void testCoordinatorSendEvents() throws Exception {
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
numEvents, delay));
+    }
+
+    @Test
+    public void testUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
numEvents, delay));
+    }
+
+    @Test
+    public void testFailingCheckpoint() throws Exception {
+        executeAndVerifyResults(
+                env, new 
FailingCheckpointOperatorFactory<>("failingCheckpoint", numEvents, delay));
+    }
+
+    private void executeAndVerifyResults(
+            StreamExecutionEnvironment env, 
OneInputStreamOperatorFactory<Long, Long> factory)
+            throws Exception {
+        DataStream<Long> stream =
+                env.addSource(new IdlingSourceFunction<>(), 
TypeInformation.of(Long.class))
+                        .setParallelism(2);
+        stream =
+                stream.transform("eventReceiving", 
TypeInformation.of(Long.class), factory)
+                        .setParallelism(1);
+        stream.addSink(new DiscardingSink<>());
+
+        JobExecutionResult executionResult =
+                MINI_CLUSTER
+                        .getMiniCluster()
+                        
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+        long count = 
executionResult.getAccumulatorResult(EventReceivingOperator.COUNTER_NAME);
+        assertThat(count).isEqualTo(numEvents);
+    }
+
+    /** A mock source function that does not collect any stream record and 
finishes on demand. */
+    private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+            implements ParallelSourceFunction<T> {
+        private boolean isCancelled = false;
+
+        @Override
+        public void run(SourceContext<T> ctx) throws Exception {
+            while (!isCancelled && !shouldCloseSource.get()) {
+                Thread.sleep(100);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            isCancelled = true;
+        }
+    }
+
+    /**
+     * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+     * EventSendingCoordinator}.
+     */
+    private static class EventReceivingOperatorFactory<IN, OUT>
+            extends AbstractStreamOperatorFactory<OUT>
+            implements CoordinatedOperatorFactory<OUT>, 
OneInputStreamOperatorFactory<IN, OUT> {
+        private final String name;
+        protected final int numEvents;
+        private final int delay;
+
+        public EventReceivingOperatorFactory(String name, int numEvents, int 
delay) {
+            this.name = name;
+            this.numEvents = numEvents;
+            this.delay = delay;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new OperatorCoordinator.Provider() {
+
+                @Override
+                public OperatorID getOperatorId() {
+                    return operatorID;
+                }
+
+                @Override
+                public OperatorCoordinator create(OperatorCoordinator.Context 
context) {
+                    return new EventSendingCoordinator(context, name, 
numEvents, delay);
+                }
+            };
+        }
+
+        @Override
+        public <T extends StreamOperator<OUT>> T createStreamOperator(
+                StreamOperatorParameters<OUT> parameters) {
+            EventReceivingOperator<OUT> operator = new 
EventReceivingOperator<>();
+            operator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            parameters
+                    .getOperatorEventDispatcher()
+                    
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+            return (T) operator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return EventReceivingOperator.class;
+        }
+    }
+
+    /**
+     * The stream operator that receives the events and accumulates the 
numbers. The task is
+     * stateful and checkpoints the accumulator.
+     */
+    private static class EventReceivingOperator<T> extends 
AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T>, OperatorEventHandler {
+        protected static final String COUNTER_NAME = "numEvents";
+
+        protected final LongCounter counter = new LongCounter();
+
+        protected ListState<Long> state;
+
+        @Override
+        public void open() throws Exception {
+            super.open();
+            getRuntimeContext().addAccumulator(COUNTER_NAME, counter);
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void handleOperatorEvent(OperatorEvent evt) {
+            if (evt instanceof IntegerEvent) {
+                counter.add(1L);
+            } else if (evt instanceof EndEvent) {
+                try {
+                    
state.update(Collections.singletonList(counter.getLocalValue()));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                shouldCloseSource.set(true);
+            } else {
+                throw new UnsupportedOperationException();
+            }
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            state.update(Collections.singletonList(counter.getLocalValue()));
+        }
+
+        @Override
+        public void initializeState(StateInitializationContext context) throws 
Exception {
+            super.initializeState(context);
+
+            state =
+                    context.getOperatorStateStore()
+                            .getListState(
+                                    new ListStateDescriptor<>(
+                                            "counterState", 
BasicTypeInfo.LONG_TYPE_INFO));
+
+            counter.resetLocal();
+            Iterator<Long> iterator = state.get().iterator();
+            if (iterator.hasNext()) {
+                counter.add(iterator.next());
+            }
+            Preconditions.checkArgument(!iterator.hasNext());
+
+            // signal the coordinator to start
+            getContainingTask()
+                    .getEnvironment()
+                    .getOperatorCoordinatorEventGateway()
+                    .sendOperatorEventToCoordinator(
+                            getOperatorID(),
+                            new SerializedValue<>(
+                                    new 
StartEvent(counter.getLocalValue().intValue() - 1)));
+        }
+    }
+
+    private static class FailingCheckpointOperatorFactory<IN, OUT>

Review Comment:
   Should we have doc for this class, similar to how 
`EventReceivingOperatorFactory` is documented?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -392,8 +427,12 @@ public void setupSubtaskGatewayForAttempts(int subtask, 
Set<Integer> attemptNumb
     }
 
     private void setupSubtaskGateway(final SubtaskAccess sta) {
-        final OperatorCoordinator.SubtaskGateway gateway =
-                new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor, 
unconfirmedEvents);
+        final SubtaskGatewayImpl gateway =
+                new SubtaskGatewayImpl(sta, mainThreadExecutor, 
unconfirmedEvents);
+
+        if (!context.isConcurrentExecutionAttemptsSupported()) {

Review Comment:
   nits: would it be useful to add the following comment:
   
   When concurrent execution attempts is supported, the checkpoint must have 
been disabled and thus we don't need to maintain subtaskGatewayMap.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import 
org.apache.flink.runtime.operators.coordination.EventReceivingTasks.EventWithSubtask;
+import 
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link SubtaskGatewayImpl}. */
+public class SubtaskGatewayImplTest {
+
+    @Test
+    public void eventsPassThroughOpenGateway() {
+        final EventReceivingTasks receiver = 
EventReceivingTasks.createForRunningTasks();
+        final SubtaskGatewayImpl gateway =
+                new SubtaskGatewayImpl(
+                        getUniqueElement(receiver.getAccessesForSubtask(11)),
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new IncompleteFuturesTracker());
+
+        final OperatorEvent event = new TestOperatorEvent();
+        final CompletableFuture<Acknowledge> future = gateway.sendEvent(event);
+
+        assertThat(receiver.events).containsExactly(new 
EventWithSubtask(event, 11));
+        assertThat(future).isDone();
+    }
+
+    @Test
+    public void closingMarkedGateway() {
+        final EventReceivingTasks receiver = 
EventReceivingTasks.createForRunningTasks();
+        final SubtaskGatewayImpl gateway =
+                new SubtaskGatewayImpl(
+                        getUniqueElement(receiver.getAccessesForSubtask(11)),
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new IncompleteFuturesTracker());
+
+        gateway.markForCheckpoint(200L);
+        final boolean isClosed = gateway.tryCloseGateway(200L);
+
+        assertThat(isClosed).isTrue();
+    }
+
+    @Test
+    public void notClosingUnmarkedGateway() {
+        final EventReceivingTasks receiver = 
EventReceivingTasks.createForRunningTasks();
+        final SubtaskGatewayImpl gateway =
+                new SubtaskGatewayImpl(
+                        getUniqueElement(receiver.getAccessesForSubtask(11)),
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new IncompleteFuturesTracker());
+
+        final boolean isClosed = gateway.tryCloseGateway(123L);
+
+        assertThat(isClosed).isFalse();
+    }
+
+    @Test
+    public void notClosingGatewayForOtherMark() {
+        final EventReceivingTasks receiver = 
EventReceivingTasks.createForRunningTasks();
+        final SubtaskGatewayImpl gateway =
+                new SubtaskGatewayImpl(
+                        getUniqueElement(receiver.getAccessesForSubtask(11)),
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new IncompleteFuturesTracker());
+
+        gateway.markForCheckpoint(100L);
+        final boolean isClosed = gateway.tryCloseGateway(123L);
+
+        assertThat(isClosed).isFalse();
+    }
+
+    @Test
+    public void eventsBlockedByClosedGateway() {
+        final EventReceivingTasks receiver = 
EventReceivingTasks.createForRunningTasks();
+        final SubtaskGatewayImpl gateway =
+                new SubtaskGatewayImpl(
+                        getUniqueElement(receiver.getAccessesForSubtask(1)),
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new IncompleteFuturesTracker());
+
+        gateway.markForCheckpoint(1L);
+        gateway.tryCloseGateway(1L);
+
+        final CompletableFuture<Acknowledge> future = gateway.sendEvent(new 
TestOperatorEvent());
+
+        assertThat(receiver.events).isEmpty();
+        assertThat(future).isNotDone();
+    }
+
+    @Test
+    public void eventsReleasedAfterOpeningGateway() {
+        final EventReceivingTasks receiver = 
EventReceivingTasks.createForRunningTasks();
+        final SubtaskGatewayImpl gateway0 =
+                new SubtaskGatewayImpl(
+                        getUniqueElement(receiver.getAccessesForSubtask(0)),
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new IncompleteFuturesTracker());
+        final SubtaskGatewayImpl gateway3 =
+                new SubtaskGatewayImpl(
+                        getUniqueElement(receiver.getAccessesForSubtask(3)),
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new IncompleteFuturesTracker());
+        List<SubtaskGatewayImpl> gateways = Arrays.asList(gateway3, gateway0);
+
+        gateways.forEach(x -> x.markForCheckpoint(17L));
+        gateways.forEach(x -> x.tryCloseGateway(17L));
+
+        final OperatorEvent event1 = new TestOperatorEvent();
+        final OperatorEvent event2 = new TestOperatorEvent();
+        final CompletableFuture<Acknowledge> future1 = 
gateway3.sendEvent(event1);
+        final CompletableFuture<Acknowledge> future2 = 
gateway0.sendEvent(event2);
+
+        gateways.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint);
+
+        assertThat(receiver.events)
+                .containsExactly(new EventWithSubtask(event1, 3), new 
EventWithSubtask(event2, 0));
+        assertThat(future1).isDone();
+        assertThat(future2).isDone();
+    }
+
+    @Test
+    public void releasedEventsForwardSendFailures() {
+        final EventReceivingTasks receiver =
+                EventReceivingTasks.createForRunningTasksFailingRpcs(new 
FlinkException("test"));
+        final SubtaskGatewayImpl gateway =
+                new SubtaskGatewayImpl(
+                        getUniqueElement(receiver.getAccessesForSubtask(10)),
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new IncompleteFuturesTracker());
+
+        gateway.markForCheckpoint(17L);
+        gateway.tryCloseGateway(17L);
+
+        final CompletableFuture<Acknowledge> future = gateway.sendEvent(new 
TestOperatorEvent());
+        gateway.openGatewayAndUnmarkCheckpoint();
+
+        assertThat(future).isCompletedExceptionally();
+    }
+
+    private static final class RejectingSubtaskGateway

Review Comment:
   Is this class used?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint.
+ *
+ * <p>This class is an extension to {@link
+ * 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase},
 further
+ * verifying the exactly-once semantics of events when the flink job is 
constructed using actual

Review Comment:
   It is still not clear what `behavior` this class intends to test and how is 
the target behavior different from that tested by 
`CoordinatorEventsExactlyOnceITCase`.
   
   For example, it is not clear what is the problem with NOT using actual 
stream operators. And what `the correctness of the behavior of stream 
operators` actually means.
   
   The document of CoordinatorEventsExactlyOnceITCase has detailed comments 
explaining the behavior it is trying to verify. Can you follow that example and 
explain what behavior this test is trying to verify?
   
   Also note that the expected behavior described in 
CoordinatorEventsExactlyOnceITCase's doc is now outdated. It says `After a 
coordinator completed its checkpoint future, all events sent after that must be 
held back until the checkpoint barriers have been sent to the sources`, which 
is no longer sufficient for achieving exactly-once semantics. This probably 
also needs to be updated in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to