Repository: flink
Updated Branches:
  refs/heads/master 009146c7e -> 77348858f


[FLINK-3050] [runtime] Add UnrecoverableException to suppress job restarts

This closes #1461.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ebbc85da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ebbc85da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ebbc85da

Branch: refs/heads/master
Commit: ebbc85da7b78c1acdac4d184bddd6dfecf1338b2
Parents: 009146c
Author: Ufuk Celebi <u...@apache.org>
Authored: Wed Dec 16 14:09:22 2015 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Mon Jan 11 16:30:25 2016 +0100

----------------------------------------------------------------------
 .../execution/UnrecoverableException.java       | 37 ++++++++++++
 .../runtime/executiongraph/ExecutionGraph.java  | 11 +++-
 .../ExecutionGraphRestartTest.java              | 63 +++++++++++++++++++-
 3 files changed, 108 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ebbc85da/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
new file mode 100644
index 0000000..5a6cd7e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution;
+
+/**
+ * Exception thrown on unrecoverable failures.
+ *
+ * <p>This exception acts as a wrapper around the real cause and suppresses
+ * job restarts. The JobManager will <strong>not</strong> restart a job, which
+ * fails with this Exception.
+ */
+public class UnrecoverableException extends RuntimeException {
+
+       private static final long serialVersionUID = 221873676920848349L;
+
+       public UnrecoverableException(Throwable cause) {
+               super("Unrecoverable failure. This suppresses job restarts. 
Please check the " +
+                               "stack trace for the root cause.", cause);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebbc85da/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 663f588..9767968 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.UnrecoverableException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -937,7 +938,11 @@ public class ExecutionGraph implements Serializable {
                                                }
                                        }
                                        else if (current == JobStatus.FAILING) {
-                                               if (numberOfRetriesLeft > 0 && 
transitionState(current, JobStatus.RESTARTING)) {
+                                               boolean isRecoverable = 
!(failureCause instanceof UnrecoverableException);
+
+                                               if (isRecoverable && 
numberOfRetriesLeft > 0 &&
+                                                               
transitionState(current, JobStatus.RESTARTING)) {
+
                                                        numberOfRetriesLeft--;
                                                        
                                                        if (delayBeforeRetrying 
> 0) {
@@ -966,7 +971,9 @@ public class ExecutionGraph implements Serializable {
                                                        }
                                                        break;
                                                }
-                                               else if (numberOfRetriesLeft <= 
0 && transitionState(current, JobStatus.FAILED, failureCause)) {
+                                               else if ((!isRecoverable || 
numberOfRetriesLeft <= 0) &&
+                                                               
transitionState(current, JobStatus.FAILED, failureCause)) {
+
                                                        postRunCleanup();
                                                        break;
                                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ebbc85da/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index a50aa2e..127ae33 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.UnrecoverableException;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -37,17 +38,20 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 public class ExecutionGraphRestartTest {
 
        private final static int NUM_TASKS = 31;
 
        @Test
-       public void testNotRestartManually() throws Exception {
+       public void testNoManualRestart() throws Exception {
                Instance instance = ExecutionGraphTestUtils.getInstance(
                                new 
SimpleActorGateway(TestingUtils.directExecutionContext()),
                                NUM_TASKS);
@@ -83,6 +87,7 @@ public class ExecutionGraphRestartTest {
 
                assertEquals(JobStatus.FAILED, eg.getState());
 
+               // This should not restart the graph.
                eg.restart();
 
                assertEquals(JobStatus.FAILED, eg.getState());
@@ -299,4 +304,60 @@ public class ExecutionGraphRestartTest {
 
                assertEquals(JobStatus.CANCELED, executionGraph.getState());
        }
+
+       @Test
+       public void testNoRestartOnUnrecoverableException() throws Exception {
+               Instance instance = ExecutionGraphTestUtils.getInstance(
+                               new 
SimpleActorGateway(TestingUtils.directExecutionContext()),
+                               NUM_TASKS);
+
+               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+               scheduler.newInstanceAvailable(instance);
+
+               JobVertex sender = new JobVertex("Task");
+               sender.setInvokableClass(Tasks.NoOpInvokable.class);
+               sender.setParallelism(NUM_TASKS);
+
+               JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+
+               ExecutionGraph eg = spy(new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               new JobID(),
+                               "Test job",
+                               new Configuration(),
+                               AkkaUtils.getDefaultTimeout()));
+
+               eg.setNumberOfRetriesLeft(1);
+               
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+               assertEquals(JobStatus.CREATED, eg.getState());
+
+               eg.scheduleForExecution(scheduler);
+
+               assertEquals(JobStatus.RUNNING, eg.getState());
+
+               // Fail with unrecoverable Exception
+               eg.getAllExecutionVertices().iterator().next().fail(
+                               new UnrecoverableException(new Exception("Test 
Exception")));
+
+               assertEquals(JobStatus.FAILING, eg.getState());
+
+               for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+                       vertex.getCurrentExecutionAttempt().cancelingComplete();
+               }
+
+               FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
+
+               // Wait for async restart
+               Deadline deadline = timeout.fromNow();
+               while (deadline.hasTimeLeft() && eg.getState() != 
JobStatus.FAILED) {
+                       Thread.sleep(100);
+               }
+
+               assertEquals(JobStatus.FAILED, eg.getState());
+
+               // No restart
+               verify(eg, never()).restart();
+               assertEquals(1, eg.getNumberOfRetriesLeft());
+       }
 }

Reply via email to