lindong28 commented on code in PR #20275: URL: https://github.com/apache/flink/pull/20275#discussion_r932818606
########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -121,7 +118,15 @@ private final OperatorID operatorId; private final LazyInitializedCoordinatorContext context; private final SubtaskAccess.SubtaskAccessFactory taskAccesses; - private final OperatorEventValve eventValve; + + /** + * A map that manages subtask gateways. It is used to control the opening/closing of each + * gateway during checkpoint. As checkpoints only work when concurrent execution attempt is Review Comment: The following statement might be more explicit: `As checkpoints only work when concurrent execution attempt is ...` -> `This map should only be read when concurrent execution attempt is disabled. Note that concurrent execution attempt is currently guaranteed to be disabled when checkpoint is enabled.` ########## flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java: ########## @@ -228,4 +234,44 @@ public void triggerTaskFailover(Throwable cause) { // ignore this in the tests } } + + private static class NoMainThreadCheckComponentMainThreadExecutor Review Comment: Can you add comments for this class similar to how other `ComponentMainThreadExecutor` subclasses are documented? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java: ########## @@ -91,27 +130,133 @@ public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt) { new FlinkException(msg, failure))); } }, - sendingExecutor); + mainThreadExecutor); - sendingExecutor.execute( + mainThreadExecutor.execute( () -> { - sender.sendEvent(sendAction, sendResult); + sendEventInternal(sendAction, sendResult); incompleteFuturesTracker.trackFutureWhileIncomplete(result); }); + return result; } - @Override - public ExecutionAttemptID getExecution() { - return subtaskAccess.currentAttempt(); + private void sendEventInternal( + Callable<CompletableFuture<Acknowledge>> sendAction, + CompletableFuture<Acknowledge> result) { + checkRunsInMainThread(); + + if (isClosed) { + blockedEvents.add(new BlockedEvent(sendAction, result)); + } else { + callSendAction(sendAction, result); + } } - @Override - public int getSubtask() { - return subtaskAccess.getSubtaskIndex(); + private void callSendAction( + Callable<CompletableFuture<Acknowledge>> sendAction, + CompletableFuture<Acknowledge> result) { + try { + final CompletableFuture<Acknowledge> sendResult = sendAction.call(); + FutureUtils.forward(sendResult, result); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalError(t); + result.completeExceptionally(t); + } } - private boolean isReady() { - return subtaskAccess.hasSwitchedToRunning().isDone(); + /** + * Marks the gateway for the next checkpoint. This remembers the checkpoint ID and will only + * allow closing the gateway for this specific checkpoint. + * + * <p>This is the gateway's mechanism to detect situations where multiple coordinator + * checkpoints would be attempted overlapping, which is currently not supported (the gateway + * doesn't keep a list of events blocked per checkpoint). It also helps to identify situations + * where the checkpoint was aborted even before the gateway was closed (by finding out that the + * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}. + */ + void markForCheckpoint(long checkpointId) { + checkRunsInMainThread(); + + if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) { + throw new IllegalStateException( + String.format( + "Cannot mark for checkpoint %d, already marked for checkpoint %d", + checkpointId, currentCheckpointId)); + } + + if (checkpointId > lastCheckpointId) { + currentCheckpointId = checkpointId; + lastCheckpointId = checkpointId; + } else { + throw new IllegalStateException( + String.format( + "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d", + lastCheckpointId, checkpointId)); + } + } + + /** + * Closes the gateway. All events sent through this gateway are blocked until the gateway is + * re-opened. If the gateway is already closed, this does nothing. + * + * @return True if the gateway is closed, false if the checkpointId is incorrect. + */ + boolean tryCloseGateway(long checkpointId) { + checkRunsInMainThread(); + + if (checkpointId == currentCheckpointId) { + isClosed = true; + return true; + } + return false; + } + + boolean isClosed() { + checkRunsInMainThread(); + return isClosed; + } + + void openGatewayAndUnmarkCheckpoint(long expectedCheckpointId) { + checkRunsInMainThread(); + + if (expectedCheckpointId != currentCheckpointId) { Review Comment: Suppose `currentCheckpointId = NO_CHECKPOINT` and `checkpointId = 10`, should we still throw exception in this case? If yes, then we need to update the condition accordingly. Also, since the reason for this condition is that not intuitive, could you add comments explaining this check? ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test case that validates the exactly-once mechanism for operator events sent around + * checkpoint. + * + * <p>This class is an extension to {@link + * org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase}, further + * verifying the exactly-once semantics of events when the flink job is constructed using actual + * stream operators and verifying the correctness of the behavior of stream operators. + */ +public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase + extends CoordinatorEventsExactlyOnceITCase { + private static final AtomicBoolean shouldCloseSource = new AtomicBoolean(false); + + private static final int numEvents = 100; + + private static final int delay = 1; + + private StreamExecutionEnvironment env; + + @Before + public void setup() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(100); + shouldCloseSource.set(false); + } + + @Test + public void testCoordinatorSendEvents() throws Exception { + executeAndVerifyResults( + env, new EventReceivingOperatorFactory<>("eventReceiving", numEvents, delay)); + } + + @Test + public void testUnalignedCheckpoint() throws Exception { + env.getCheckpointConfig().enableUnalignedCheckpoints(); + executeAndVerifyResults( + env, new EventReceivingOperatorFactory<>("eventReceiving", numEvents, delay)); + } + + @Test + public void testFailingCheckpoint() throws Exception { + executeAndVerifyResults( + env, new FailingCheckpointOperatorFactory<>("failingCheckpoint", numEvents, delay)); + } + + private void executeAndVerifyResults( + StreamExecutionEnvironment env, OneInputStreamOperatorFactory<Long, Long> factory) + throws Exception { + DataStream<Long> stream = + env.addSource(new IdlingSourceFunction<>(), TypeInformation.of(Long.class)) + .setParallelism(2); + stream = + stream.transform("eventReceiving", TypeInformation.of(Long.class), factory) + .setParallelism(1); + stream.addSink(new DiscardingSink<>()); + + JobExecutionResult executionResult = + MINI_CLUSTER + .getMiniCluster() + .executeJobBlocking(env.getStreamGraph().getJobGraph()); + + long count = executionResult.getAccumulatorResult(EventReceivingOperator.COUNTER_NAME); + assertThat(count).isEqualTo(numEvents); + } + + /** A mock source function that does not collect any stream record and finishes on demand. */ + private static class IdlingSourceFunction<T> extends RichSourceFunction<T> + implements ParallelSourceFunction<T> { + private boolean isCancelled = false; + + @Override + public void run(SourceContext<T> ctx) throws Exception { + while (!isCancelled && !shouldCloseSource.get()) { + Thread.sleep(100); + } + } + + @Override + public void cancel() { + isCancelled = true; + } + } + + /** + * A wrapper operator factory for {@link EventReceivingOperator} and {@link + * EventSendingCoordinator}. + */ + private static class EventReceivingOperatorFactory<IN, OUT> + extends AbstractStreamOperatorFactory<OUT> + implements CoordinatedOperatorFactory<OUT>, OneInputStreamOperatorFactory<IN, OUT> { + private final String name; + protected final int numEvents; + private final int delay; + + public EventReceivingOperatorFactory(String name, int numEvents, int delay) { + this.name = name; + this.numEvents = numEvents; + this.delay = delay; + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider( + String operatorName, OperatorID operatorID) { + return new OperatorCoordinator.Provider() { + + @Override + public OperatorID getOperatorId() { + return operatorID; + } + + @Override + public OperatorCoordinator create(OperatorCoordinator.Context context) { + return new EventSendingCoordinator(context, name, numEvents, delay); + } + }; + } + + @Override + public <T extends StreamOperator<OUT>> T createStreamOperator( + StreamOperatorParameters<OUT> parameters) { + EventReceivingOperator<OUT> operator = new EventReceivingOperator<>(); + operator.setup( + parameters.getContainingTask(), + parameters.getStreamConfig(), + parameters.getOutput()); + parameters + .getOperatorEventDispatcher() + .registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator); + return (T) operator; + } + + @Override + public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { + return EventReceivingOperator.class; + } + } + + /** + * The stream operator that receives the events and accumulates the numbers. The task is + * stateful and checkpoints the accumulator. + */ + private static class EventReceivingOperator<T> extends AbstractStreamOperator<T> + implements OneInputStreamOperator<T, T>, OperatorEventHandler { + protected static final String COUNTER_NAME = "numEvents"; + + protected final LongCounter counter = new LongCounter(); + + protected ListState<Long> state; + + @Override + public void open() throws Exception { + super.open(); + getRuntimeContext().addAccumulator(COUNTER_NAME, counter); + } + + @Override + public void processElement(StreamRecord<T> element) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void handleOperatorEvent(OperatorEvent evt) { + if (evt instanceof IntegerEvent) { + counter.add(1L); + } else if (evt instanceof EndEvent) { + try { + state.update(Collections.singletonList(counter.getLocalValue())); + } catch (Exception e) { + throw new RuntimeException(e); + } + shouldCloseSource.set(true); + } else { + throw new UnsupportedOperationException(); + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + state.update(Collections.singletonList(counter.getLocalValue())); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + state = + context.getOperatorStateStore() + .getListState( + new ListStateDescriptor<>( + "counterState", BasicTypeInfo.LONG_TYPE_INFO)); + + counter.resetLocal(); + Iterator<Long> iterator = state.get().iterator(); + if (iterator.hasNext()) { + counter.add(iterator.next()); + } + Preconditions.checkArgument(!iterator.hasNext()); + + // signal the coordinator to start + getContainingTask() + .getEnvironment() + .getOperatorCoordinatorEventGateway() + .sendOperatorEventToCoordinator( + getOperatorID(), + new SerializedValue<>( + new StartEvent(counter.getLocalValue().intValue() - 1))); + } + } + + private static class FailingCheckpointOperatorFactory<IN, OUT> Review Comment: Should we have doc for this class, similar to how `EventReceivingOperatorFactory` is documented? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -392,8 +427,12 @@ public void setupSubtaskGatewayForAttempts(int subtask, Set<Integer> attemptNumb } private void setupSubtaskGateway(final SubtaskAccess sta) { - final OperatorCoordinator.SubtaskGateway gateway = - new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor, unconfirmedEvents); + final SubtaskGatewayImpl gateway = + new SubtaskGatewayImpl(sta, mainThreadExecutor, unconfirmedEvents); + + if (!context.isConcurrentExecutionAttemptsSupported()) { Review Comment: nits: would it be useful to add the following comment: When concurrent execution attempts is supported, the checkpoint must have been disabled and thus we don't need to maintain subtaskGatewayMap. ########## flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.EventReceivingTasks.EventWithSubtask; +import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for the {@link SubtaskGatewayImpl}. */ +public class SubtaskGatewayImplTest { + + @Test + public void eventsPassThroughOpenGateway() { + final EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks(); + final SubtaskGatewayImpl gateway = + new SubtaskGatewayImpl( + getUniqueElement(receiver.getAccessesForSubtask(11)), + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + new IncompleteFuturesTracker()); + + final OperatorEvent event = new TestOperatorEvent(); + final CompletableFuture<Acknowledge> future = gateway.sendEvent(event); + + assertThat(receiver.events).containsExactly(new EventWithSubtask(event, 11)); + assertThat(future).isDone(); + } + + @Test + public void closingMarkedGateway() { + final EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks(); + final SubtaskGatewayImpl gateway = + new SubtaskGatewayImpl( + getUniqueElement(receiver.getAccessesForSubtask(11)), + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + new IncompleteFuturesTracker()); + + gateway.markForCheckpoint(200L); + final boolean isClosed = gateway.tryCloseGateway(200L); + + assertThat(isClosed).isTrue(); + } + + @Test + public void notClosingUnmarkedGateway() { + final EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks(); + final SubtaskGatewayImpl gateway = + new SubtaskGatewayImpl( + getUniqueElement(receiver.getAccessesForSubtask(11)), + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + new IncompleteFuturesTracker()); + + final boolean isClosed = gateway.tryCloseGateway(123L); + + assertThat(isClosed).isFalse(); + } + + @Test + public void notClosingGatewayForOtherMark() { + final EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks(); + final SubtaskGatewayImpl gateway = + new SubtaskGatewayImpl( + getUniqueElement(receiver.getAccessesForSubtask(11)), + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + new IncompleteFuturesTracker()); + + gateway.markForCheckpoint(100L); + final boolean isClosed = gateway.tryCloseGateway(123L); + + assertThat(isClosed).isFalse(); + } + + @Test + public void eventsBlockedByClosedGateway() { + final EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks(); + final SubtaskGatewayImpl gateway = + new SubtaskGatewayImpl( + getUniqueElement(receiver.getAccessesForSubtask(1)), + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + new IncompleteFuturesTracker()); + + gateway.markForCheckpoint(1L); + gateway.tryCloseGateway(1L); + + final CompletableFuture<Acknowledge> future = gateway.sendEvent(new TestOperatorEvent()); + + assertThat(receiver.events).isEmpty(); + assertThat(future).isNotDone(); + } + + @Test + public void eventsReleasedAfterOpeningGateway() { + final EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks(); + final SubtaskGatewayImpl gateway0 = + new SubtaskGatewayImpl( + getUniqueElement(receiver.getAccessesForSubtask(0)), + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + new IncompleteFuturesTracker()); + final SubtaskGatewayImpl gateway3 = + new SubtaskGatewayImpl( + getUniqueElement(receiver.getAccessesForSubtask(3)), + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + new IncompleteFuturesTracker()); + List<SubtaskGatewayImpl> gateways = Arrays.asList(gateway3, gateway0); + + gateways.forEach(x -> x.markForCheckpoint(17L)); + gateways.forEach(x -> x.tryCloseGateway(17L)); + + final OperatorEvent event1 = new TestOperatorEvent(); + final OperatorEvent event2 = new TestOperatorEvent(); + final CompletableFuture<Acknowledge> future1 = gateway3.sendEvent(event1); + final CompletableFuture<Acknowledge> future2 = gateway0.sendEvent(event2); + + gateways.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint); + + assertThat(receiver.events) + .containsExactly(new EventWithSubtask(event1, 3), new EventWithSubtask(event2, 0)); + assertThat(future1).isDone(); + assertThat(future2).isDone(); + } + + @Test + public void releasedEventsForwardSendFailures() { + final EventReceivingTasks receiver = + EventReceivingTasks.createForRunningTasksFailingRpcs(new FlinkException("test")); + final SubtaskGatewayImpl gateway = + new SubtaskGatewayImpl( + getUniqueElement(receiver.getAccessesForSubtask(10)), + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + new IncompleteFuturesTracker()); + + gateway.markForCheckpoint(17L); + gateway.tryCloseGateway(17L); + + final CompletableFuture<Acknowledge> future = gateway.sendEvent(new TestOperatorEvent()); + gateway.openGatewayAndUnmarkCheckpoint(); + + assertThat(future).isCompletedExceptionally(); + } + + private static final class RejectingSubtaskGateway Review Comment: Is this class used? ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test case that validates the exactly-once mechanism for operator events sent around + * checkpoint. + * + * <p>This class is an extension to {@link + * org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase}, further + * verifying the exactly-once semantics of events when the flink job is constructed using actual Review Comment: It is still not clear what `behavior` this class intends to test and how is the target behavior different from that tested by `CoordinatorEventsExactlyOnceITCase`. For example, it is not clear what is the problem with NOT using actual stream operators. And what `the correctness of the behavior of stream operators` actually means. The document of CoordinatorEventsExactlyOnceITCase has detailed comments explaining the behavior it is trying to verify. Can you follow that example and explain what behavior this test is trying to verify? Also note that the expected behavior described in CoordinatorEventsExactlyOnceITCase's doc is now outdated. It says `After a coordinator completed its checkpoint future, all events sent after that must be held back until the checkpoint barriers have been sent to the sources`, which is no longer sufficient for achieving exactly-once semantics. This probably also needs to be updated in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org