rkhachatryan commented on a change in pull request #17645:
URL: https://github.com/apache/flink/pull/17645#discussion_r741869256



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
##########
@@ -149,8 +149,11 @@ private void ensureRunning() throws Exception {
         // checking before is not enough because (check + enqueue) is not 
atomic
         if (wasClosed || !thread.isAlive()) {
             cleanupRequests();
-            throw ExceptionUtils.firstOrSuppressed(
-                    new IllegalStateException("not running"), thrown);
+            IllegalStateException exception = new IllegalStateException("not 
running");

Review comment:
       `firstOrSuppressed` is used a lot, while this case is probably not so 
wide spread; so I think it's better to keep `ExceptionUtils` simpler. WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
##########
@@ -221,9 +226,11 @@ public void notifyCheckpointAborted(long checkpointId) {
             @Nonnull CheckpointOptions checkpointOptions) {
         return new FutureTask<>(
                 () ->
-                        SnapshotResult.of(
-                                new MockKeyedStateHandle<>(
-                                        copy(stateValues, 
stateSnapshotFilters))));
+                        emptySnapshot

Review comment:
       A non-empty snapshot can't be handled by JM because  
`MockKeyedStateHandle` can't be serialized by MetadataSerializer. Adding a 
branch to (de)serialize mock state handle to production code doesn't seem right 
to me, so I decided to have an empty snapshot instead.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -296,13 +296,17 @@ private void runWithChecks(RunnableWithException r) 
throws Exception {
             r.run();
         } catch (Exception e) {
             fail(e);
-            throw e;
         }
     }
 
     public void fail(Throwable e) throws Exception {

Review comment:
       Good point, thanks.
   
   Though I'm going to partially revert this change: otherwise, programmatic 
and other errors aren't propagated (that's why 
`ChannelStateWriteRequestDispatcherTest` fails). But they don't have to be 
declared (non-recoverable by definition).
   
   Instead, I'm going to add something like
   ```
               fail(e);
               if (!ExceptionUtils.findThrowable(e, 
IOException.class).isPresent()) {
                   ExceptionUtils.rethrow(e);
               }
   ```
   (but first I'd like to confirm that the original exception was indeed 
`IOException`)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -296,13 +296,17 @@ private void runWithChecks(RunnableWithException r) 
throws Exception {
             r.run();
         } catch (Exception e) {
             fail(e);
-            throw e;
         }
     }
 
     public void fail(Throwable e) throws Exception {

Review comment:
       Good point, thanks, I'll do this.
   
   Though I'm going to partially revert this change: otherwise, programmatic 
and other errors aren't propagated (that's why 
`ChannelStateWriteRequestDispatcherTest` fails). But they don't have to be 
declared (non-recoverable by definition).
   
   Instead, I'm going to add something like
   ```
               fail(e);
               if (!ExceptionUtils.findThrowable(e, 
IOException.class).isPresent()) {
                   ExceptionUtils.rethrow(e);
               }
   ```
   (but first I'd like to confirm that the original exception was indeed 
`IOException`)

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.Collections.singletonList;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static org.apache.flink.api.common.time.Deadline.fromNow;
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
+/**
+ * Tests failure handling in channel state persistence.
+ *
+ * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-24667";>FLINK-24667</a>
+ */
+public class UnalignedCheckpointFailureHandlingITCase {
+
+    private static final int PARALLELISM = 2;
+
+    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(PARALLELISM)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    @Test
+    public void testCheckpointSuccessAfterFailure() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        TestCheckpointStorage storage =
+                new TestCheckpointStorage(
+                        new JobManagerCheckpointStorage(), sharedObjects, 
temporaryFolder);
+
+        configure(env, storage);
+        buildGraph(env);
+
+        JobClient jobClient = env.executeAsync();
+        JobID jobID = jobClient.getJobID();
+        MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+
+        waitForJobStatus(jobClient, singletonList(RUNNING), 
fromNow(Duration.ofSeconds(30)));
+        waitForAllTaskRunning(miniCluster, jobID, false);
+
+        triggerFailingCheckpoint(jobID, TestException.class, miniCluster);
+
+        miniCluster.triggerCheckpoint(jobID).get();
+    }
+
+    private void configure(StreamExecutionEnvironment env, 
TestCheckpointStorage storage) {
+        // enable checkpointing but only via API
+        env.enableCheckpointing(Long.MAX_VALUE, 
CheckpointingMode.EXACTLY_ONCE);
+
+        env.getCheckpointConfig().setCheckpointStorage(storage);
+
+        // use non-snapshotting backend to test channel state persistence 
integration with
+        // checkpoint storage
+        env.setStateBackend(new MockStateBackend(true));
+
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+
+        env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO); 
// speed-up
+
+        // failures are emitted by the storage
+        
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
+
+        // DoP > 1 is required for some barriers to lag
+        env.setParallelism(PARALLELISM);
+
+        // no chaining to have input channels (doesn't matter local or remote)
+        env.disableOperatorChaining();
+    }
+
+    private void buildGraph(StreamExecutionEnvironment env) {
+        // with zero alignment timeout some steps here are not strictly 
necessary currently, but
+        // this may change in the future
+        env.fromSource(
+                        new NumberSequenceSource(0, Long.MAX_VALUE),
+                        WatermarkStrategy.noWatermarks(),
+                        "num-source")
+                // source is not parallel, so keyBy to send to all down-streams
+                .keyBy(value -> value)
+                // exert back-pressure
+                .map(
+                        value -> {
+                            Thread.sleep(0, 100);

Review comment:
       I think it's platform-specific; I don't mind changing to 1ms though.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
##########
@@ -149,8 +149,11 @@ private void ensureRunning() throws Exception {
         // checking before is not enough because (check + enqueue) is not 
atomic
         if (wasClosed || !thread.isAlive()) {
             cleanupRequests();
-            throw ExceptionUtils.firstOrSuppressed(
-                    new IllegalStateException("not running"), thrown);
+            IllegalStateException exception = new IllegalStateException("not 
running");

Review comment:
       `firstOrSuppressed` is used a lot, while this case is probably not so 
wide spread; so I think it's better to keep `ExceptionUtils` simpler. WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
##########
@@ -221,9 +226,11 @@ public void notifyCheckpointAborted(long checkpointId) {
             @Nonnull CheckpointOptions checkpointOptions) {
         return new FutureTask<>(
                 () ->
-                        SnapshotResult.of(
-                                new MockKeyedStateHandle<>(
-                                        copy(stateValues, 
stateSnapshotFilters))));
+                        emptySnapshot

Review comment:
       A non-empty snapshot can't be handled by JM because  
`MockKeyedStateHandle` can't be serialized by MetadataSerializer. Adding a 
branch to (de)serialize mock state handle to production code doesn't seem right 
to me, so I decided to have an empty snapshot instead.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -296,13 +296,17 @@ private void runWithChecks(RunnableWithException r) 
throws Exception {
             r.run();
         } catch (Exception e) {
             fail(e);
-            throw e;
         }
     }
 
     public void fail(Throwable e) throws Exception {

Review comment:
       Good point, thanks.
   
   Though I'm going to partially revert this change: otherwise, programmatic 
and other errors aren't propagated (that's why 
`ChannelStateWriteRequestDispatcherTest` fails). But they don't have to be 
declared (non-recoverable by definition).
   
   Instead, I'm going to add something like
   ```
               fail(e);
               if (!ExceptionUtils.findThrowable(e, 
IOException.class).isPresent()) {
                   ExceptionUtils.rethrow(e);
               }
   ```
   (but first I'd like to confirm that the original exception was indeed 
`IOException`)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -296,13 +296,17 @@ private void runWithChecks(RunnableWithException r) 
throws Exception {
             r.run();
         } catch (Exception e) {
             fail(e);
-            throw e;
         }
     }
 
     public void fail(Throwable e) throws Exception {

Review comment:
       Good point, thanks, I'll do this.
   
   Though I'm going to partially revert this change: otherwise, programmatic 
and other errors aren't propagated (that's why 
`ChannelStateWriteRequestDispatcherTest` fails). But they don't have to be 
declared (non-recoverable by definition).
   
   Instead, I'm going to add something like
   ```
               fail(e);
               if (!ExceptionUtils.findThrowable(e, 
IOException.class).isPresent()) {
                   ExceptionUtils.rethrow(e);
               }
   ```
   (but first I'd like to confirm that the original exception was indeed 
`IOException`)

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.Collections.singletonList;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static org.apache.flink.api.common.time.Deadline.fromNow;
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
+/**
+ * Tests failure handling in channel state persistence.
+ *
+ * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-24667";>FLINK-24667</a>
+ */
+public class UnalignedCheckpointFailureHandlingITCase {
+
+    private static final int PARALLELISM = 2;
+
+    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(PARALLELISM)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    @Test
+    public void testCheckpointSuccessAfterFailure() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        TestCheckpointStorage storage =
+                new TestCheckpointStorage(
+                        new JobManagerCheckpointStorage(), sharedObjects, 
temporaryFolder);
+
+        configure(env, storage);
+        buildGraph(env);
+
+        JobClient jobClient = env.executeAsync();
+        JobID jobID = jobClient.getJobID();
+        MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+
+        waitForJobStatus(jobClient, singletonList(RUNNING), 
fromNow(Duration.ofSeconds(30)));
+        waitForAllTaskRunning(miniCluster, jobID, false);
+
+        triggerFailingCheckpoint(jobID, TestException.class, miniCluster);
+
+        miniCluster.triggerCheckpoint(jobID).get();
+    }
+
+    private void configure(StreamExecutionEnvironment env, 
TestCheckpointStorage storage) {
+        // enable checkpointing but only via API
+        env.enableCheckpointing(Long.MAX_VALUE, 
CheckpointingMode.EXACTLY_ONCE);
+
+        env.getCheckpointConfig().setCheckpointStorage(storage);
+
+        // use non-snapshotting backend to test channel state persistence 
integration with
+        // checkpoint storage
+        env.setStateBackend(new MockStateBackend(true));
+
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+
+        env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO); 
// speed-up
+
+        // failures are emitted by the storage
+        
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
+
+        // DoP > 1 is required for some barriers to lag
+        env.setParallelism(PARALLELISM);
+
+        // no chaining to have input channels (doesn't matter local or remote)
+        env.disableOperatorChaining();
+    }
+
+    private void buildGraph(StreamExecutionEnvironment env) {
+        // with zero alignment timeout some steps here are not strictly 
necessary currently, but
+        // this may change in the future
+        env.fromSource(
+                        new NumberSequenceSource(0, Long.MAX_VALUE),
+                        WatermarkStrategy.noWatermarks(),
+                        "num-source")
+                // source is not parallel, so keyBy to send to all down-streams
+                .keyBy(value -> value)
+                // exert back-pressure
+                .map(
+                        value -> {
+                            Thread.sleep(0, 100);

Review comment:
       I think it's platform-specific; I don't mind changing to 1ms though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to