[FLINK-4296] Fixes failure reporting of consumer task scheduling when producer 
has already finished

This PR changes the failure behaviour such that the consumer task is failed 
instead of the
producer task. The latter is problematic, since a finsihed producer task will 
simply swallow
scheduling exception originating from scheduling the consumer task.

This closes #2321.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24951845
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24951845
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24951845

Branch: refs/heads/release-1.1
Commit: 24951845131c6b191a04fa569c22e2e8cbc71fa8
Parents: 383a1cc
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Aug 1 18:05:14 2016 +0200
Committer: Ufuk Celebi <u...@apache.org>
Committed: Tue Aug 2 20:24:36 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  2 +-
 .../ExecutionGraphDeploymentTest.java           | 64 ++++++++++++++++++++
 2 files changed, 65 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24951845/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 1b32100..fd296c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -548,7 +548,7 @@ public class Execution implements Serializable {
                                                                        
consumerVertex.getExecutionGraph().getScheduler(),
                                                                        
consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed());
                                                } catch (Throwable t) {
-                                                       fail(new 
IllegalStateException("Could not schedule consumer " +
+                                                       consumerVertex.fail(new 
IllegalStateException("Could not schedule consumer " +
                                                                        "vertex 
" + consumerVertex, t));
                                                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24951845/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index a599f42..2d0ae41 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -27,11 +27,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -40,12 +44,16 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -276,6 +284,62 @@ public class ExecutionGraphDeploymentTest {
                }
        }
 
+       @Test
+       /**
+        * Tests that a blocking batch job fails if there are not enough 
resources left to schedule the
+        * succeeding tasks. This test case is related to [FLINK-4296] where 
finished producing tasks
+        * swallow the fail exception when scheduling a consumer task.
+        */
+       public void testNoResourceAvailableFailure() throws Exception {
+               final JobID jobId = new JobID();
+               JobVertex v1 = new JobVertex("source");
+               JobVertex v2 = new JobVertex("sink");
+
+               int dop1 = 1;
+               int dop2 = 1;
+
+               v1.setParallelism(dop1);
+               v2.setParallelism(dop2);
+
+               v1.setInvokableClass(BatchTask.class);
+               v2.setInvokableClass(BatchTask.class);
+
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING, false);
+
+               // execution graph that executes actions synchronously
+               ExecutionGraph eg = new ExecutionGraph(
+                       TestingUtils.directExecutionContext(),
+                       jobId,
+                       "failing test job",
+                       new Configuration(),
+                       new SerializedValue<>(new ExecutionConfig()),
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
+
+               eg.setQueuedSchedulingAllowed(false);
+
+               List<JobVertex> ordered = Arrays.asList(v1, v2);
+               eg.attachJobGraph(ordered);
+
+               Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
+               for (int i = 0; i < dop1; i++) {
+                       scheduler.newInstanceAvailable(
+                               ExecutionGraphTestUtils.getInstance(
+                                       new 
ExecutionGraphTestUtils.SimpleActorGateway(
+                                               
TestingUtils.directExecutionContext())));
+               }
+               assertEquals(dop1, scheduler.getNumberOfAvailableSlots());
+
+               // schedule, this triggers mock deployment
+               eg.scheduleForExecution(scheduler);
+
+               ExecutionAttemptID attemptID = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
+               eg.updateState(new TaskExecutionState(jobId, attemptID, 
ExecutionState.RUNNING));
+               eg.updateState(new TaskExecutionState(jobId, attemptID, 
ExecutionState.FINISHED, null, new AccumulatorSnapshot(jobId, attemptID, new 
HashMap<AccumulatorRegistry.Metric, Accumulator<?, ?>>(), new HashMap<String, 
Accumulator<?, ?>>())));
+
+               assertEquals(JobStatus.FAILED, eg.getState());
+       }
+
        private Map<ExecutionAttemptID, Execution> setupExecution(JobVertex v1, 
int dop1, JobVertex v2, int dop2) throws Exception {
                final JobID jobId = new JobID();
 

Reply via email to