This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 600cde4  [FLINK-24303][coordination] Failure when creating a source 
enumerator lead to full failover, not JobManager failure.
600cde4 is described below

commit 600cde46a72bd78ac3aefffde7ae936e57624131
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 20 02:59:02 2021 +0200

    [FLINK-24303][coordination] Failure when creating a source enumerator lead 
to full failover, not JobManager failure.
    
    Instead of letting exceptions during the creation of the Source Enumerator 
bubble up (and utimately fail
    the JobManager / Scheduler creation), we now catch those exceptions and 
trigger a full (global) failover
    for that case.
---
 .../source/reader/CoordinatedSourceITCase.java     | 164 +++++++++++++++++++++
 .../source/coordinator/SourceCoordinator.java      |  20 ++-
 .../source/coordinator/SourceCoordinatorTest.java  |  20 +++
 3 files changed, 201 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index 158ec94..4c23973 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -20,17 +20,26 @@ package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.common.accumulators.ListAccumulator;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
@@ -60,6 +69,34 @@ public class CoordinatedSourceITCase extends 
AbstractTestBase {
         executeAndVerify(env, stream1.union(stream2), 40);
     }
 
+    @Test
+    public void testEnumeratorCreationFails() throws Exception {
+        OnceFailingToCreateEnumeratorSource.reset();
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+        final Source<Integer, ?, ?> source =
+                new OnceFailingToCreateEnumeratorSource(2, 10, 
Boundedness.BOUNDED);
+        final DataStream<Integer> stream =
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"TestingSource");
+        executeAndVerify(env, stream, 20);
+    }
+
+    @Test
+    public void testEnumeratorRestoreFails() throws Exception {
+        OnceFailingToRestoreEnumeratorSource.reset();
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+        env.enableCheckpointing(10);
+
+        final Source<Integer, ?, ?> source =
+                new OnceFailingToRestoreEnumeratorSource(2, 10, 
Boundedness.BOUNDED);
+        final DataStream<Integer> stream =
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"TestingSource");
+        executeAndVerify(env, stream, 20);
+    }
+
     @SuppressWarnings("serial")
     private void executeAndVerify(
             StreamExecutionEnvironment env, DataStream<Integer> stream, int 
numRecords)
@@ -83,4 +120,131 @@ public class CoordinatedSourceITCase extends 
AbstractTestBase {
         assertEquals(0, (int) result.get(0));
         assertEquals(numRecords - 1, (int) result.get(result.size() - 1));
     }
+
+    // ------------------------------------------------------------------------
+
+    private static class OnceFailingToCreateEnumeratorSource extends 
MockBaseSource {
+
+        private static final long serialVersionUID = 1L;
+        private static boolean hasFailed;
+
+        OnceFailingToCreateEnumeratorSource(
+                int numSplits, int numRecordsPerSplit, Boundedness 
boundedness) {
+            super(numSplits, numRecordsPerSplit, boundedness);
+        }
+
+        @Override
+        public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> 
createEnumerator(
+                SplitEnumeratorContext<MockSourceSplit> enumContext) {
+            if (!hasFailed) {
+                hasFailed = true;
+                throw new FlinkRuntimeException("Test Failure");
+            }
+
+            return super.createEnumerator(enumContext);
+        }
+
+        static void reset() {
+            hasFailed = false;
+        }
+    }
+
+    /**
+     * A source with the following behavior:
+     *
+     * <ol>
+     *   <li>It initially creates an enumerator that does not assign work, 
waits until the first
+     *       checkpoint completes (which contains all work, because none is 
assigned, yet) and then
+     *       triggers a global failure.
+     *   <li>Upon restoring from the failure, the first attempt to restore the 
enumerator fails with
+     *       an exception.
+     *   <li>The next time to restore the enumerator succeeds and the 
enumerator works regularly.
+     * </ol>
+     */
+    private static class OnceFailingToRestoreEnumeratorSource extends 
MockBaseSource {
+
+        private static final long serialVersionUID = 1L;
+        private static boolean hasFailed;
+
+        OnceFailingToRestoreEnumeratorSource(
+                int numSplits, int numRecordsPerSplit, Boundedness 
boundedness) {
+            super(numSplits, numRecordsPerSplit, boundedness);
+        }
+
+        @Override
+        public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> 
createEnumerator(
+                SplitEnumeratorContext<MockSourceSplit> enumContext) {
+
+            final SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> 
enumerator =
+                    super.createEnumerator(enumContext);
+
+            if (hasFailed) {
+                // after the failure happened, we proceed normally
+                return enumerator;
+            } else {
+                // before the failure, we go with
+                try {
+                    final List<MockSourceSplit> splits = 
enumerator.snapshotState(1L);
+                    return new NonAssigningEnumerator(splits, enumContext);
+                } catch (Exception e) {
+                    throw new FlinkRuntimeException(e.getMessage(), e);
+                }
+            }
+        }
+
+        @Override
+        public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> 
restoreEnumerator(
+                SplitEnumeratorContext<MockSourceSplit> enumContext,
+                List<MockSourceSplit> checkpoint)
+                throws IOException {
+            if (!hasFailed) {
+                hasFailed = true;
+                throw new FlinkRuntimeException("Test Failure");
+            }
+
+            return super.restoreEnumerator(enumContext, checkpoint);
+        }
+
+        static void reset() {
+            hasFailed = false;
+        }
+
+        /**
+         * This enumerator does not assign work, so all state is in the 
checkpoint. After the first
+         * checkpoint is complete, it triggers a global failure.
+         */
+        private static class NonAssigningEnumerator extends 
MockSplitEnumerator {
+
+            private final SplitEnumeratorContext<?> context;
+
+            NonAssigningEnumerator(
+                    List<MockSourceSplit> splits, 
SplitEnumeratorContext<MockSourceSplit> context) {
+                super(splits, context);
+                this.context = context;
+            }
+
+            @Override
+            public void addReader(int subtaskId) {
+                // we do nothing here to make sure there is no progress
+            }
+
+            @Override
+            public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+                // we do nothing here to make sure there is no progress
+            }
+
+            @Override
+            public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                // This is a bit of a clumsy way to trigger a global failover 
from a coordinator.
+                // This is safe, though, because per the contract, exceptions 
in the enumerator
+                // handlers trigger a global failover.
+                context.callAsync(
+                        () -> null,
+                        (success, failure) -> {
+                            throw new FlinkRuntimeException(
+                                    "Artificial trigger for Global Failover");
+                        });
+            }
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index add85bb..5ba4160 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -111,6 +111,10 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
     public void start() throws Exception {
         LOG.info("Starting split enumerator for source {}.", operatorName);
 
+        // we mark this as started first, so that we can later distinguish the 
cases where
+        // 'start()' wasn't called and where 'start()' failed.
+        started = true;
+
         // there are two ways the coordinator can get created:
         //  (1) Source.restoreEnumerator(), in which case the 
'resetToCheckpoint()' method creates
         // it
@@ -122,13 +126,17 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT>
             try (TemporaryClassLoaderContext ignored =
                     TemporaryClassLoaderContext.of(userCodeClassLoader)) {
                 enumerator = source.createEnumerator(context);
+            } catch (Throwable t) {
+                ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                LOG.error("Failed to create Source Enumerator for source {}", 
operatorName, t);
+                context.failJob(t);
+                return;
             }
         }
 
         // The start sequence is the first task in the coordinator executor.
         // We rely on the single-threaded coordinator executor to guarantee
         // the other methods are invoked after the enumerator has started.
-        started = true;
         runInEventLoop(() -> enumerator.start(), "starting the 
SplitEnumerator.");
     }
 
@@ -309,6 +317,14 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
             final Object... actionNameFormatParameters) {
 
         ensureStarted();
+
+        // we may end up here even for a non-started enumerator, in case the 
instantiation
+        // failed, and we get the 'subtaskFailed()' notification during the 
failover.
+        // we need to ignore those.
+        if (enumerator == null) {
+            return;
+        }
+
         coordinatorExecutor.execute(
                 () -> {
                     try {
@@ -410,7 +426,5 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
         if (!started) {
             throw new IllegalStateException("The coordinator has not started 
yet.");
         }
-
-        assert enumerator != null;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 0b62215..b562643 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -255,6 +255,26 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
     }
 
     @Test
+    public void testFailJobWhenExceptionThrownFromEnumeratorCreation() throws 
Exception {
+        final RuntimeException failureReason = new 
RuntimeException("Artificial Exception");
+
+        final SourceCoordinator<?, ?> coordinator =
+                new SourceCoordinator<>(
+                        OPERATOR_NAME,
+                        coordinatorExecutor,
+                        new EnumeratorCreatingSource<>(
+                                () -> {
+                                    throw failureReason;
+                                }),
+                        context);
+
+        coordinator.start();
+
+        assertTrue(operatorCoordinatorContext.isJobFailed());
+        assertEquals(failureReason, 
operatorCoordinatorContext.getJobFailureReason());
+    }
+
+    @Test
     public void testErrorThrownFromSplitEnumerator() throws Exception {
         final Error error = new Error("Test Error");
 

Reply via email to