Repository: flink Updated Branches: refs/heads/master 009146c7e -> 77348858f
[FLINK-3050] [runtime] Add UnrecoverableException to suppress job restarts This closes #1461. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ebbc85da Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ebbc85da Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ebbc85da Branch: refs/heads/master Commit: ebbc85da7b78c1acdac4d184bddd6dfecf1338b2 Parents: 009146c Author: Ufuk Celebi <u...@apache.org> Authored: Wed Dec 16 14:09:22 2015 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Mon Jan 11 16:30:25 2016 +0100 ---------------------------------------------------------------------- .../execution/UnrecoverableException.java | 37 ++++++++++++ .../runtime/executiongraph/ExecutionGraph.java | 11 +++- .../ExecutionGraphRestartTest.java | 63 +++++++++++++++++++- 3 files changed, 108 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ebbc85da/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java new file mode 100644 index 0000000..5a6cd7e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution; + +/** + * Exception thrown on unrecoverable failures. + * + * <p>This exception acts as a wrapper around the real cause and suppresses + * job restarts. The JobManager will <strong>not</strong> restart a job, which + * fails with this Exception. + */ +public class UnrecoverableException extends RuntimeException { + + private static final long serialVersionUID = 221873676920848349L; + + public UnrecoverableException(Throwable cause) { + super("Unrecoverable failure. This suppresses job restarts. Please check the " + + "stack trace for the root cause.", cause); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ebbc85da/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 663f588..9767968 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.UnrecoverableException; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -937,7 +938,11 @@ public class ExecutionGraph implements Serializable { } } else if (current == JobStatus.FAILING) { - if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) { + boolean isRecoverable = !(failureCause instanceof UnrecoverableException); + + if (isRecoverable && numberOfRetriesLeft > 0 && + transitionState(current, JobStatus.RESTARTING)) { + numberOfRetriesLeft--; if (delayBeforeRetrying > 0) { @@ -966,7 +971,9 @@ public class ExecutionGraph implements Serializable { } break; } - else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) { + else if ((!isRecoverable || numberOfRetriesLeft <= 0) && + transitionState(current, JobStatus.FAILED, failureCause)) { + postRunCleanup(); break; } http://git-wip-us.apache.org/repos/asf/flink/blob/ebbc85da/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index a50aa2e..127ae33 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.UnrecoverableException; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -37,17 +38,20 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; public class ExecutionGraphRestartTest { private final static int NUM_TASKS = 31; @Test - public void testNotRestartManually() throws Exception { + public void testNoManualRestart() throws Exception { Instance instance = ExecutionGraphTestUtils.getInstance( new SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS); @@ -83,6 +87,7 @@ public class ExecutionGraphRestartTest { assertEquals(JobStatus.FAILED, eg.getState()); + // This should not restart the graph. eg.restart(); assertEquals(JobStatus.FAILED, eg.getState()); @@ -299,4 +304,60 @@ public class ExecutionGraphRestartTest { assertEquals(JobStatus.CANCELED, executionGraph.getState()); } + + @Test + public void testNoRestartOnUnrecoverableException() throws Exception { + Instance instance = ExecutionGraphTestUtils.getInstance( + new SimpleActorGateway(TestingUtils.directExecutionContext()), + NUM_TASKS); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + scheduler.newInstanceAvailable(instance); + + JobVertex sender = new JobVertex("Task"); + sender.setInvokableClass(Tasks.NoOpInvokable.class); + sender.setParallelism(NUM_TASKS); + + JobGraph jobGraph = new JobGraph("Pointwise job", sender); + + ExecutionGraph eg = spy(new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + new JobID(), + "Test job", + new Configuration(), + AkkaUtils.getDefaultTimeout())); + + eg.setNumberOfRetriesLeft(1); + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + assertEquals(JobStatus.CREATED, eg.getState()); + + eg.scheduleForExecution(scheduler); + + assertEquals(JobStatus.RUNNING, eg.getState()); + + // Fail with unrecoverable Exception + eg.getAllExecutionVertices().iterator().next().fail( + new UnrecoverableException(new Exception("Test Exception"))); + + assertEquals(JobStatus.FAILING, eg.getState()); + + for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { + vertex.getCurrentExecutionAttempt().cancelingComplete(); + } + + FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES); + + // Wait for async restart + Deadline deadline = timeout.fromNow(); + while (deadline.hasTimeLeft() && eg.getState() != JobStatus.FAILED) { + Thread.sleep(100); + } + + assertEquals(JobStatus.FAILED, eg.getState()); + + // No restart + verify(eg, never()).restart(); + assertEquals(1, eg.getNumberOfRetriesLeft()); + } }