lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r930593185


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -308,6 +337,24 @@ private void checkpointCoordinatorInternal(
         }
     }
 
+    private boolean closeGateways(
+            final long checkpointId, final Set<Integer> subtasksToCheckpoint) {
+        boolean hasCloseableGateway = false;
+        for (int subtask : subtasksToCheckpoint) {
+            SubtaskGatewayImpl gateway = subtaskGatewayMap.get(subtask);
+            if (!gateway.tryCloseGateway(checkpointId)) {

Review Comment:
   If we do want to capture that bug, how about we consistently call 
`tryCloseGateway()` for every subtasks and check the condition at the end of 
the loop, instead of stopping at the first subtask whose `tryCloseGateway()` 
return false?
   
   Otherwise, suppose `subtask_1.tryCloseGateway() == false` and 
`subtask_2.tryCloseGateway() == true`, the method will not throw Exception. 
This makes the method's behavior harder to understand.
   
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/AcknowledgeCheckpointEvent.java:
##########
@@ -18,20 +18,16 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
-import org.apache.flink.runtime.messages.Acknowledge;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
+/**
+ * An {@link OperatorEvent} sent from a subtask to its {@link 
OperatorCoordinator} to signal that
+ * the checkpoint of an individual task is completed.
+ */
+public class AcknowledgeCheckpointEvent implements OperatorEvent {
 
-/** Simple interface for a component that takes and sends events. */
-@FunctionalInterface
-interface EventSender {
+    /** The ID of the checkpoint that this event is related to. */
+    final long checkpointId;

Review Comment:
   Is there any existing example where we make a variable protected and do not 
provide accessor method for this variable?
   
   If no, it seems better to keep code style consistent with existing code.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsExactlyOnceTest.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EndEvent;
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator;
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.IntegerEvent;
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.StartEvent;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link
+ * 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase},
 further
+ * verifying the exactly-once semantics of events when the flink job is 
constructed using actual
+ * stream operators.
+ */
+public class CoordinatorEventsExactlyOnceTest {
+    @ClassRule
+    public static final MiniClusterResource MINI_CLUSTER =
+            new MiniClusterResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(2)
+                            .build());
+
+    private static final AtomicBoolean shouldCloseSource = new 
AtomicBoolean(false);
+
+    private static final int numEvents = 100;
+
+    private static final int delay = 1;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource.set(false);
+    }
+
+    @Test
+    public void test() throws Exception {

Review Comment:
   Give this test a more meaningful name?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsExactlyOnceTest.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EndEvent;
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator;
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.IntegerEvent;
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.StartEvent;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link
+ * 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase},
 further
+ * verifying the exactly-once semantics of events when the flink job is 
constructed using actual
+ * stream operators.
+ */
+public class CoordinatorEventsExactlyOnceTest {

Review Comment:
   Could you explain why we add a new class here instead of adding tests in 
`CoordinatorEventsExactlyOnceITCase`?
   
   If we do need to add new class, would you update class names and doc for 
clarify the difference?
   
   In general we want the test class name to self-explain its purpose. It is 
not clear what is the difference between `CoordinatorEventsExactlyOnceITCase` 
and `CoordinatorEventsExactlyOnceTest`, and which class should be used for new 
tests in the future.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -229,6 +233,7 @@ public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> r
         // checkpoint coordinator time thread.
         // we can remove the delegation once the checkpoint coordinator runs 
fully in the
         // scheduler's main thread executor
+        
Preconditions.checkState(!context.isConcurrentExecutionAttemptsSupported());

Review Comment:
   We typically only check condition when the program would produce incorrect 
result if this condition is not met. Otherwise it is easy to get program 
unnecessary inefficient over time.
   
   Could you double check that we do need to check *this condition* at *this 
place*?
   
   Same for other `Preconditions.checkState` added in this PR.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsExactlyOnceTest.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EndEvent;
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator;
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.IntegerEvent;
+import static 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.StartEvent;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link
+ * 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase},
 further
+ * verifying the exactly-once semantics of events when the flink job is 
constructed using actual
+ * stream operators.
+ */
+public class CoordinatorEventsExactlyOnceTest {
+    @ClassRule
+    public static final MiniClusterResource MINI_CLUSTER =
+            new MiniClusterResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(2)
+                            .build());
+
+    private static final AtomicBoolean shouldCloseSource = new 
AtomicBoolean(false);
+
+    private static final int numEvents = 100;
+
+    private static final int delay = 1;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource.set(false);
+    }
+
+    @Test
+    public void test() throws Exception {
+        long count =
+                executeAndGetNumReceivedEvents(
+                        env,
+                        new EventReceivingOperatorFactory<>("eventReceiving", 
numEvents, delay));
+        assertThat(count).isEqualTo(numEvents);
+    }
+
+    @Test
+    public void testUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        long count =
+                executeAndGetNumReceivedEvents(
+                        env,
+                        new EventReceivingOperatorFactory<>("eventReceiving", 
numEvents, delay));
+        assertThat(count).isEqualTo(numEvents);
+    }
+
+    @Test
+    public void testFailingCheckpoint() throws Exception {

Review Comment:
   What condition does this test check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to