http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index 339a813..b16109a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -98,7 +98,7 @@ public class ExecutionGraphConstructionTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -136,7 +136,7 @@ public class ExecutionGraphConstructionTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -197,7 +197,7 @@ public class ExecutionGraphConstructionTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -445,7 +445,7 @@ public class ExecutionGraphConstructionTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -495,7 +495,7 @@ public class ExecutionGraphConstructionTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v5, v4)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); fail("Attached wrong jobgraph"); @@ -550,7 +550,8 @@ public class ExecutionGraphConstructionTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -590,7 +591,7 @@ public class ExecutionGraphConstructionTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3)); ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); @@ -656,7 +657,7 @@ public class ExecutionGraphConstructionTest { JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8); ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources()); // check the v1 / v2 co location hints ( assumes parallelism(v1) >= parallelism(v2) )
http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 4ee28e6..526ba7f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -101,7 +101,7 @@ public class ExecutionGraphDeploymentTest { v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new Configuration(), - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); List<AbstractJobVertex> ordered = Arrays.asList(v1, v2, v3, v4); @@ -308,7 +308,7 @@ public class ExecutionGraphDeploymentTest { // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new Configuration(), - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); eg.setQueuedSchedulingAllowed(false); List<AbstractJobVertex> ordered = Arrays.asList(v1, v2); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index e8e1f7e..7d8229c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -151,10 +151,10 @@ public class ExecutionGraphTestUtils { ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); ExecutionGraph graph = new ExecutionGraph(new JobID(), "test job", new Configuration(), - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1, - AkkaUtils.DEFAULT_TIMEOUT())); + AkkaUtils.getDefaultTimeout())); Answer<Void> noop = new Answer<Void>() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 2f1af70..2d1d9d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -66,7 +66,7 @@ public class ExecutionStateProgressTest { ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); ExecutionGraph graph = new ExecutionGraph(jid, "test job", new Configuration(), - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); graph.attachJobGraph(Arrays.asList(ajv)); setGraphStatus(graph, JobStatus.RUNNING); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index ee89954..168ea90 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -76,7 +76,7 @@ public class ExecutionVertexCancelTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -103,7 +103,7 @@ public class ExecutionVertexCancelTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); setVertexState(vertex, ExecutionState.SCHEDULED); assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState()); @@ -137,7 +137,7 @@ public class ExecutionVertexCancelTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); setVertexState(vertex, ExecutionState.SCHEDULED); @@ -210,7 +210,7 @@ public class ExecutionVertexCancelTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); setVertexState(vertex, ExecutionState.SCHEDULED); @@ -288,7 +288,7 @@ public class ExecutionVertexCancelTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); final TestActorRef<? extends Actor> taskManager = TestActorRef.create(system, @@ -336,7 +336,7 @@ public class ExecutionVertexCancelTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); final ActorRef taskManager = TestActorRef.create(system, Props.create(new @@ -392,7 +392,7 @@ public class ExecutionVertexCancelTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); final ActorRef taskManager = TestActorRef.create(system,Props.create(new @@ -435,7 +435,7 @@ public class ExecutionVertexCancelTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); final ActorRef taskManager = TestActorRef.create(system, Props.create(new CancelSequenceTaskManagerCreator())); @@ -478,7 +478,7 @@ public class ExecutionVertexCancelTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId(); final ActorRef taskManager = system.actorOf( @@ -524,7 +524,7 @@ public class ExecutionVertexCancelTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); setVertexState(vertex, ExecutionState.CANCELED); assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); @@ -569,7 +569,7 @@ public class ExecutionVertexCancelTest { // scheduling while canceling is an illegal state transition try { ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); setVertexState(vertex, ExecutionState.CANCELING); Scheduler scheduler = mock(Scheduler.class); @@ -583,7 +583,7 @@ public class ExecutionVertexCancelTest { // deploying while in canceling state is illegal (should immediately go to canceled) try { ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); setVertexState(vertex, ExecutionState.CANCELING); Instance instance = getInstance(ActorRef.noSender()); @@ -598,7 +598,7 @@ public class ExecutionVertexCancelTest { // fail while canceling { ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); Instance instance = getInstance(ActorRef.noSender()); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index c0d1db8..d8a7db3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -69,7 +69,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); vertex.deployToSlot(slot); @@ -112,7 +112,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -156,7 +156,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -211,7 +211,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -247,7 +247,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -295,7 +295,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); vertex.deployToSlot(slot); @@ -334,7 +334,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId(); final TestActorRef simpleTaskManager = TestActorRef.create(system, Props.create(new http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 8230433..66ef4ca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -71,7 +71,7 @@ public class ExecutionVertexSchedulingTest { final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); Scheduler scheduler = mock(Scheduler.class); when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot); @@ -104,7 +104,7 @@ public class ExecutionVertexSchedulingTest { final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); Scheduler scheduler = mock(Scheduler.class); when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future); @@ -140,7 +140,7 @@ public class ExecutionVertexSchedulingTest { final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); Scheduler scheduler = mock(Scheduler.class); when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index e59a65e..ddd7282 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -56,7 +56,7 @@ public class PointwisePatternTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -91,7 +91,7 @@ public class PointwisePatternTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -127,7 +127,7 @@ public class PointwisePatternTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -164,7 +164,7 @@ public class PointwisePatternTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -199,7 +199,7 @@ public class PointwisePatternTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -254,7 +254,7 @@ public class PointwisePatternTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -300,7 +300,7 @@ public class PointwisePatternTest { List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.DEFAULT_TIMEOUT()); + ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index 7cbd5a8..4f82dea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -69,7 +69,7 @@ public class VertexSlotSharingTest { List<AbstractJobVertex> vertices = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration(), - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); eg.attachJobGraph(vertices); // verify that the vertices are all in the same slot sharing group http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java deleted file mode 100644 index 94bdef1..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import static org.mockito.Mockito.*; -import static org.junit.Assert.*; - -import java.net.InetAddress; - -import akka.actor.ActorRef; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.jobgraph.JobID; -import org.junit.Test; -import org.mockito.Matchers; - -public class AllocatedSlotTest { - - @Test - public void testStateTransitions() { - try { - // cancel, then release - { - SimpleSlot slot = getSlot(); - assertTrue(slot.isAlive()); - - slot.cancel(); - assertFalse(slot.isAlive()); - assertTrue(slot.isCanceled()); - assertFalse(slot.isReleased()); - - slot.releaseSlot(); - assertFalse(slot.isAlive()); - assertTrue(slot.isCanceled()); - assertTrue(slot.isReleased()); - } - - // release immediately - { - SimpleSlot slot = getSlot(); - assertTrue(slot.isAlive()); - - slot.releaseSlot(); - assertFalse(slot.isAlive()); - assertTrue(slot.isCanceled()); - assertTrue(slot.isReleased()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSetExecutionVertex() { - try { - Execution ev = mock(Execution.class); - Execution ev_2 = mock(Execution.class); - - // assign to alive slot - { - SimpleSlot slot = getSlot(); - - assertTrue(slot.setExecutedVertex(ev)); - assertEquals(ev, slot.getExecution()); - - // try to add another one - assertFalse(slot.setExecutedVertex(ev_2)); - assertEquals(ev, slot.getExecution()); - } - - // assign to canceled slot - { - SimpleSlot slot = getSlot(); - slot.cancel(); - - assertFalse(slot.setExecutedVertex(ev)); - assertNull(slot.getExecution()); - } - - // assign to released - { - SimpleSlot slot = getSlot(); - slot.releaseSlot(); - - assertFalse(slot.setExecutedVertex(ev)); - assertNull(slot.getExecution()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testReleaseCancelsVertex() { - try { - Execution ev = mock(Execution.class); - - SimpleSlot slot = getSlot(); - assertTrue(slot.setExecutedVertex(ev)); - assertEquals(ev, slot.getExecution()); - - slot.cancel(); - slot.releaseSlot(); - slot.cancel(); - - verify(ev, times(1)).fail(Matchers.any(Throwable.class)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - public static SimpleSlot getSlot() throws Exception { - HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); - InetAddress address = InetAddress.getByName("127.0.0.1"); - InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); - - Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 1); - return instance.allocateSimpleSlot(new JobID()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java new file mode 100644 index 0000000..b16bf4b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +import java.net.InetAddress; + +import akka.actor.ActorRef; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.jobgraph.JobID; +import org.junit.Test; +import org.mockito.Matchers; + +public class SimpleSlotTest { + + @Test + public void testStateTransitions() { + try { + // cancel, then release + { + SimpleSlot slot = getSlot(); + assertTrue(slot.isAlive()); + + slot.cancel(); + assertFalse(slot.isAlive()); + assertTrue(slot.isCanceled()); + assertFalse(slot.isReleased()); + + slot.releaseSlot(); + assertFalse(slot.isAlive()); + assertTrue(slot.isCanceled()); + assertTrue(slot.isReleased()); + } + + // release immediately + { + SimpleSlot slot = getSlot(); + assertTrue(slot.isAlive()); + + slot.releaseSlot(); + assertFalse(slot.isAlive()); + assertTrue(slot.isCanceled()); + assertTrue(slot.isReleased()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSetExecutionVertex() { + try { + Execution ev = mock(Execution.class); + Execution ev_2 = mock(Execution.class); + + // assign to alive slot + { + SimpleSlot slot = getSlot(); + + assertTrue(slot.setExecutedVertex(ev)); + assertEquals(ev, slot.getExecution()); + + // try to add another one + assertFalse(slot.setExecutedVertex(ev_2)); + assertEquals(ev, slot.getExecution()); + } + + // assign to canceled slot + { + SimpleSlot slot = getSlot(); + slot.cancel(); + + assertFalse(slot.setExecutedVertex(ev)); + assertNull(slot.getExecution()); + } + + // assign to released + { + SimpleSlot slot = getSlot(); + slot.releaseSlot(); + + assertFalse(slot.setExecutedVertex(ev)); + assertNull(slot.getExecution()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testReleaseCancelsVertex() { + try { + Execution ev = mock(Execution.class); + + SimpleSlot slot = getSlot(); + assertTrue(slot.setExecutedVertex(ev)); + assertEquals(ev, slot.getExecution()); + + slot.cancel(); + slot.releaseSlot(); + slot.cancel(); + + verify(ev, times(1)).fail(Matchers.any(Throwable.class)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + public static SimpleSlot getSlot() throws Exception { + HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); + InetAddress address = InetAddress.getByName("127.0.0.1"); + InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); + + Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 1); + return instance.allocateSimpleSlot(new JobID()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 60ede47..d55a67b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -538,10 +538,10 @@ public class TaskManagerTest { Configuration cfg = new Configuration(); cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10); GlobalConfiguration.includeConfiguration(cfg); - String akkaURL = jm.path().toString(); - cfg.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, akkaURL); + String jobManagerURL = jm.path().toString(); - ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system); + ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", + jobManagerURL, cfg, system); Future<Object> response = Patterns.ask(taskManager, TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 89d1c4e..007ad5e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -47,7 +47,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -61,10 +64,10 @@ public class TaskTest { final JobVertexID vid = new JobVertexID(); final ExecutionAttemptID eid = new ExecutionAttemptID(); - final TaskManager taskManager = mock(TaskManager.class); final RuntimeEnvironment env = mock(RuntimeEnvironment.class); - Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager); + Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender())); + doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class)); task.setEnvironment(env); assertEquals(ExecutionState.DEPLOYING, task.getExecutionState()); @@ -84,7 +87,7 @@ public class TaskTest { task.markFailed(new Exception("test")); assertTrue(ExecutionState.CANCELED == task.getExecutionState()); - verify(taskManager, times(1)).notifyExecutionStateChange(jid, eid, ExecutionState.CANCELED, null); + verify(task, times(1)).notifyExecutionStateChange(ExecutionState.CANCELED, null); } catch (Exception e) { e.printStackTrace(); @@ -99,10 +102,8 @@ public class TaskTest { final JobVertexID vid = new JobVertexID(); final ExecutionAttemptID eid = new ExecutionAttemptID(); - final TaskManager taskManager = mock(TaskManager.class); - - - final Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager); + final Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender())); + doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class)); final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); @@ -135,7 +136,7 @@ public class TaskTest { assertEquals(ExecutionState.FINISHED, task.getExecutionState()); - verify(taskManager).notifyExecutionStateChange(jid, eid, ExecutionState.FINISHED, null); + verify(task).notifyExecutionStateChange(ExecutionState.FINISHED, null); } catch (Exception e) { e.printStackTrace(); @@ -150,9 +151,8 @@ public class TaskTest { final JobVertexID vid = new JobVertexID(); final ExecutionAttemptID eid = new ExecutionAttemptID(); - final TaskManager taskManager = mock(TaskManager.class); - - final Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager); + final Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender())); + doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class)); final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); @@ -185,7 +185,7 @@ public class TaskTest { // make sure the final state is correct and the task manager knows the changes assertEquals(ExecutionState.FAILED, task.getExecutionState()); - verify(taskManager).notifyExecutionStateChange(Matchers.eq(jid), Matchers.eq(eid), Matchers.eq(ExecutionState.FAILED), Matchers.any(Throwable.class)); + verify(task).notifyExecutionStateChange(Matchers.eq(ExecutionState.FAILED), any(Throwable.class)); } catch (Exception e) { e.printStackTrace(); @@ -199,10 +199,9 @@ public class TaskTest { final JobID jid = new JobID(); final JobVertexID vid = new JobVertexID(); final ExecutionAttemptID eid = new ExecutionAttemptID(); - - final TaskManager taskManager = mock(TaskManager.class); - - final Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager); + + final Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender())); + doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class)); final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); @@ -246,7 +245,7 @@ public class TaskTest { // make sure the final state is correct and the task manager knows the changes assertEquals(ExecutionState.CANCELED, task.getExecutionState()); - verify(taskManager).notifyExecutionStateChange(jid, eid, ExecutionState.CANCELED, null); + verify(task).notifyExecutionStateChange(ExecutionState.CANCELED, null); } catch (Exception e) { e.printStackTrace(); @@ -261,15 +260,14 @@ public class TaskTest { final JobVertexID vid = new JobVertexID(); final ExecutionAttemptID eid = new ExecutionAttemptID(); - final TaskManager taskManager = mock(TaskManager.class); - TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7, new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(), Collections.<PartitionDeploymentDescriptor>emptyList(), Collections.<PartitionConsumerDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0); - Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager); + Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender())); + doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class)); RuntimeEnvironment env = new RuntimeEnvironment(mock(ActorRef.class), task, tdd, getClass().getClassLoader(), mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class), @@ -284,7 +282,7 @@ public class TaskTest { assertEquals(ExecutionState.FINISHED, task.getExecutionState()); - verify(taskManager).notifyExecutionStateChange(jid, eid, ExecutionState.FINISHED, null); + verify(task).notifyExecutionStateChange(ExecutionState.FINISHED, null); } catch (Exception e) { e.printStackTrace(); @@ -299,15 +297,14 @@ public class TaskTest { final JobVertexID vid = new JobVertexID(); final ExecutionAttemptID eid = new ExecutionAttemptID(); - final TaskManager taskManager = mock(TaskManager.class); - TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7, new Configuration(), new Configuration(), TestInvokableWithException.class.getName(), Collections.<PartitionDeploymentDescriptor>emptyList(), Collections.<PartitionConsumerDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0); - Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager); + Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender())); + doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class)); RuntimeEnvironment env = new RuntimeEnvironment(mock(ActorRef.class), task, tdd, getClass().getClassLoader(), mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class), @@ -322,10 +319,10 @@ public class TaskTest { assertEquals(ExecutionState.FAILED, task.getExecutionState()); - verify(taskManager).notifyExecutionStateChange(Matchers.eq(jid), Matchers.eq(eid), Matchers.eq(ExecutionState.FAILED), Matchers.any(Throwable.class)); - verify(taskManager, times(0)).notifyExecutionStateChange(jid, eid, ExecutionState.CANCELING, null); - verify(taskManager, times(0)).notifyExecutionStateChange(jid, eid, ExecutionState.CANCELED, null); - verify(taskManager, times(0)).notifyExecutionStateChange(jid, eid, ExecutionState.FINISHED, null); + verify(task).notifyExecutionStateChange(Matchers.eq(ExecutionState.FAILED), any(Throwable.class)); + verify(task, times(0)).notifyExecutionStateChange(ExecutionState.CANCELING, null); + verify(task, times(0)).notifyExecutionStateChange(ExecutionState.CANCELED, null); + verify(task, times(0)).notifyExecutionStateChange(ExecutionState.FINISHED, null); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala index 01f3df8..775c0cb 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.runtime.executiongraph import akka.actor.{Props, ActorSystem} -import akka.testkit.{TestKit} +import akka.testkit.TestKit import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex} @@ -56,7 +56,7 @@ with Matchers with BeforeAndAfterAll { val jobGraph = new JobGraph("Pointwise job", sender) val eg = new ExecutionGraph(new JobID(), "test job", new Configuration(), - AkkaUtils.DEFAULT_TIMEOUT) + AkkaUtils.getDefaultTimeout) eg.setNumberOfRetriesLeft(0) eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) @@ -100,7 +100,7 @@ with Matchers with BeforeAndAfterAll { val jobGraph = new JobGraph("Pointwise job", sender) val eg = new ExecutionGraph(new JobID(), "Test job", new Configuration(), - AkkaUtils.DEFAULT_TIMEOUT) + AkkaUtils.getDefaultTimeout) eg.setNumberOfRetriesLeft(1) eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index 2c04e5c..69b0588 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -59,7 +59,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { val jobGraph = new JobGraph("Pointwise job", sender) val eg = new ExecutionGraph(new JobID(), "test job", new Configuration(), - AkkaUtils.DEFAULT_TIMEOUT) + AkkaUtils.getDefaultTimeout) eg.setNumberOfRetriesLeft(0) eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala index fd52f5e..2104034 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala @@ -49,7 +49,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with WrapA val receiver = new AbstractJobVertex("Receiver") sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass((classOf[Receiver])) + receiver.setInvokableClass(classOf[Receiver]) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerFailsITCase.scala new file mode 100644 index 0000000..88c604b --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerFailsITCase.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager + +import akka.actor.{PoisonPill, ActorSystem} +import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated, +NotifyWhenJobManagerTerminated} +import org.apache.flink.runtime.testingUtils.TestingUtils +import org.junit.runner.RunWith +import org.scalatest.{WordSpecLike, Matchers, BeforeAndAfterAll} +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class JobManagerFailsITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender +with WordSpecLike with Matchers with BeforeAndAfterAll { + + def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + "The TaskManager" should { + "detect a lost connection to the JobManager and try to reconnect to it" in { + val num_slots = 11 + + val cluster = TestingUtils.startTestingClusterDeathWatch(num_slots, 1) + + val tm = cluster.getTaskManagers(0) + val jm = cluster.getJobManager + + try{ + jm ! RequestNumberRegisteredTaskManager + expectMsg(1) + + tm ! NotifyWhenJobManagerTerminated(jm) + + jm ! PoisonPill + + expectMsgClass(classOf[JobManagerTerminated]) + + cluster.restartJobManager() + + cluster.waitForTaskManagersToBeRegistered() + + cluster.getJobManager ! RequestNumberRegisteredTaskManager + + expectMsg(1) + }finally{ + cluster.stop() + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 9bcd163..c2ceac3 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -23,6 +23,7 @@ import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph, ScheduleMode} +import org.apache.flink.runtime.testingUtils.TestingUtils import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved import org.apache.flink.runtime.testingUtils.TestingUtils @@ -32,6 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import scheduler.{NoResourceAvailableException, SlotSharingGroup} import scala.concurrent.duration._ +import scala.language.postfixOps @RunWith(classOf[JUnitRunner]) class JobManagerITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with @@ -40,7 +42,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - override def afterAll: Unit = { + override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index e20471d..a7be14b 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -37,7 +37,7 @@ class RecoveryITCase(_system: ActorSystem) extends TestKit(_system) with Implici WordSpecLike with Matchers with BeforeAndAfterAll { def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - override def afterAll: Unit = { + override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala index 57a49cf..289f759 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala @@ -34,7 +34,7 @@ class SlotSharingITCase(_system: ActorSystem) extends TestKit(_system) with Impl WordSpecLike with Matchers with BeforeAndAfterAll { def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - override def afterAll: Unit = { + override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala index 22ee2c1..8d4a9ba 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala @@ -22,9 +22,9 @@ import akka.actor.{Kill, ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, SubmissionSuccess, SubmitJob} -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{WorkingTaskManager, -RequestWorkingTaskManager, AllVerticesRunning, WaitForAllVerticesToBeRunningOrFinished} +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestNumberRegisteredTaskManager, +JobResultFailed, SubmissionSuccess, SubmitJob} +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -36,11 +36,40 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - override def afterAll(): Unit ={ + override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } "The JobManager" should { + + "detect a failing task manager" in { + val num_slots = 11 + + val cluster = TestingUtils.startTestingClusterDeathWatch(num_slots, 2) + + val taskManagers = cluster.getTaskManagers + val jm = cluster.getJobManager + + try{ + within(TestingUtils.TESTING_DURATION){ + jm ! RequestNumberRegisteredTaskManager + expectMsg(2) + + jm ! NotifyWhenTaskManagerTerminated(taskManagers(0)) + + taskManagers(0) ! PoisonPill + + val TaskManagerTerminated(tm) = expectMsgClass(classOf[TaskManagerTerminated]) + + jm ! RequestNumberRegisteredTaskManager + expectMsg(1) + } + }finally{ + cluster.stop() + } + + } + "handle gracefully failing task manager" in { val num_tasks = 31 val sender = new AbstractJobVertex("Sender") http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala index bd542b7..ec41141 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala @@ -31,6 +31,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import scala.concurrent.duration._ +import scala.language.postfixOps + class TaskManagerRegistrationITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { @@ -108,10 +110,10 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { "shutdown after the maximum registration duration has been exceeded" in { val config = new Configuration() - config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, self.path.toString) config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "1 second") - val tm = TestingUtils.startTestingTaskManagerWithConfiguration("LOCALHOST", config) + val tm = TestingUtils.startTestingTaskManagerWithConfiguration("LOCALHOST", + self.path.toString, config) watch(tm) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index c2fd543..70f085a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -99,7 +99,7 @@ object Tasks { var reader: RecordReader[IntegerRecord] = _ override def registerInputOutput(): Unit = { - val env = getEnvironment() + val env = getEnvironment reader = new RecordReader[IntegerRecord](env.getReader(0), classOf[IntegerRecord]) } @@ -207,7 +207,7 @@ object Tasks { var writer: RecordWriter[IntegerRecord] = _ override def registerInputOutput(): Unit = { - writer = new RecordWriter[IntegerRecord](getEnvironment().getWriter(0)) + writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0)) } override def invoke(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index dbed4ff..4b6d2a3 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -25,8 +25,16 @@ import org.apache.flink.runtime.minicluster.FlinkMiniCluster import org.apache.flink.runtime.net.NetUtils import org.apache.flink.runtime.taskmanager.TaskManager -class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(userConfiguration, - true) { +/** + * Testing cluster which starts the [[JobManager]] and [[TaskManager]] actors with testing support + * in the same [[ActorSystem]]. + * + * @param userConfiguration Configuration object with the user provided configuration values + * @param singleActorSystem true if all actors shall be running in the same [[ActorSystem]], + * otherwise false + */ +class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true) extends +FlinkMiniCluster(userConfiguration, singleActorSystem) { override def generateConfiguration(userConfig: Configuration): Configuration = { val cfg = new Configuration() @@ -45,9 +53,15 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster( override def startTaskManager(index: Int)(implicit system: ActorSystem) = { val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) = - TaskManager.parseConfiguration(HOSTNAME, configuration, true) + TaskManager.parseConfiguration(HOSTNAME, configuration, + localAkkaCommunication = singleActorSystem, localTaskManagerCommunication = true) system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index) } + + def restartJobManager(): Unit = { + jobManagerActorSystem.stop(jobManagerActor) + jobManagerActor = startJobManager(jobManagerActorSystem) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index ad85547..0d7ce60 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.testingUtils -import akka.actor.{Cancellable, ActorRef, Props} +import akka.actor.{Cancellable, Terminated, ActorRef, Props} import akka.pattern.{ask, pipe} import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.execution.ExecutionState @@ -31,13 +31,18 @@ import scala.collection.convert.WrapAsScala import scala.concurrent.Future import scala.concurrent.duration._ +import scala.language.postfixOps +/** + * Mixin for [[TestingJobManager]] to support testing messages + */ trait TestingJobManager extends ActorLogMessages with WrapAsScala { that: JobManager => import context._ val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]() val waitForAllVerticesToBeRunningOrFinished = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() @@ -96,7 +101,8 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { periodicCheck = None } - case NotifyWhenJobRemoved(jobID) => { + + case NotifyWhenJobRemoved(jobID) => val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager) val responses = tms.map{ @@ -107,8 +113,18 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { import context.dispatcher Future.fold(responses)(true)(_ & _) pipeTo sender - } + case NotifyWhenTaskManagerTerminated(taskManager) => + val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set()) + waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender) + case msg@Terminated(taskManager) => + super.receiveWithLogMessages(msg) + waitForTaskManagerToBeTerminated.get(taskManager.path.name) foreach { + _ foreach { + listener => + listener ! TaskManagerTerminated(taskManager) + } + } case RequestWorkingTaskManager(jobID) => currentJobs.get(jobID) match { case Some((eg, _)) => http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index c59efdc..a5a577b 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -48,4 +48,7 @@ object TestingJobManagerMessages { case class JobStatusIs(jobID: JobID, state: JobStatus) case object NotifyListeners + + case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef) + case class TaskManagerTerminated(taskManager: ActorRef) } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala index d3a3526..88d3cd0 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala @@ -22,6 +22,9 @@ import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.jobmanager.MemoryArchivist import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph} +/** + * Mixin for the [[MemoryArchivist]] to support testing messages + */ trait TestingMemoryArchivist extends ActorLogMessages { self: MemoryArchivist => http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index f1fe3e5..abdb173 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -18,22 +18,28 @@ package org.apache.flink.runtime.testingUtils -import akka.actor.ActorRef +import akka.actor.{Terminated, ActorRef} import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved +import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ import scala.concurrent.duration._ +import scala.language.postfixOps +/** + * Mixin for the [[TaskManager]] to support testing messages + */ trait TestingTaskManager extends ActorLogMessages { that: TaskManager => val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]() val waitForJobRemoval = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]() abstract override def receiveWithLogMessages = { receiveTestMessages orElse super.receiveWithLogMessages @@ -59,21 +65,19 @@ trait TestingTaskManager extends ActorLogMessages { case None => } - case RequestBroadcastVariablesWithReferences => { + case RequestBroadcastVariablesWithReferences => sender ! ResponseBroadcastVariablesWithReferences( bcVarManager.getNumberOfVariablesWithReferences) - } - case RequestNumActiveConnections => { + case RequestNumActiveConnections => networkEnvironment match { case Some(ne) => sender ! ResponseNumActiveConnections( ne.getConnectionManager.getNumberOfActiveConnections) case None => sender ! ResponseNumActiveConnections(0) } - } - case NotifyWhenJobRemoved(jobID) => { + case NotifyWhenJobRemoved(jobID) => if(runningTasks.values.exists(_.getJobID == jobID)){ val set = waitForJobRemoval.getOrElse(jobID, Set()) waitForJobRemoval += (jobID -> (set + sender)) @@ -85,9 +89,8 @@ trait TestingTaskManager extends ActorLogMessages { case None => sender ! true } } - } - - case CheckIfJobRemoved(jobID) => { + + case CheckIfJobRemoved(jobID) => if(runningTasks.values.forall(_.getJobID != jobID)){ waitForJobRemoval.get(jobID) match { case Some(listeners) => listeners foreach (_ ! true) @@ -97,6 +100,18 @@ trait TestingTaskManager extends ActorLogMessages { import context.dispatcher context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID)) } - } + + case NotifyWhenJobManagerTerminated(jobManager) => + val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set()) + waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender) + + case msg@Terminated(jobManager) => + super.receiveWithLogMessages(msg) + + waitForJobManagerToBeTerminated.get(jobManager.path.name) foreach { + _ foreach { + _ ! JobManagerTerminated(jobManager) + } + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala index bbbf3d2..ebe4555 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala @@ -18,6 +18,7 @@ package org.apache.flink.runtime.testingUtils +import akka.actor.ActorRef import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.taskmanager.Task @@ -44,16 +45,20 @@ object TestingTaskManagerMessages { case object RequestRunningTasks case object RequestBroadcastVariablesWithReferences + + case class NotifyWhenJobManagerTerminated(jobManager: ActorRef) + + case class JobManagerTerminated(jobManager: ActorRef) // -------------------------------------------------------------------------- // Utility methods to allow simpler case object access from Java // -------------------------------------------------------------------------- - def getRequestRunningTasksMessage() : AnyRef = { + def getRequestRunningTasksMessage: AnyRef = { RequestRunningTasks } - def getRequestBroadcastVariablesWithReferencesMessage() : AnyRef = { + def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = { RequestBroadcastVariablesWithReferences } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 5fd133f..a9d8f51 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -22,81 +22,44 @@ import akka.actor.{ActorRef, Props, ActorSystem} import akka.testkit.CallingThreadDispatcher import com.typesafe.config.ConfigFactory import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.core.io.IOReadableWritable import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.akka.serialization.IOReadableWritableSerializer import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue import org.apache.flink.runtime.jobmanager.JobManager -import org.apache.flink.runtime.minicluster.FlinkMiniCluster import org.apache.flink.runtime.taskmanager.TaskManager import scala.concurrent.duration._ import scala.concurrent.ExecutionContext +import scala.language.postfixOps +/** + * Convenience functions to test actor based components. + */ object TestingUtils { val testConfig = ConfigFactory.parseString(getDefaultTestingActorSystemConfigString) val TESTING_DURATION = 2 minute - val DEFAULT_AKKA_ASK_TIMEOUT = 1000 + val DEFAULT_AKKA_ASK_TIMEOUT = "200 s" def getDefaultTestingActorSystemConfigString: String = { - val ioRWSerializerClass = classOf[IOReadableWritableSerializer].getCanonicalName - val ioRWClass = classOf[IOReadableWritable].getCanonicalName - val logLevel = AkkaUtils.getLogLevel s"""akka.daemonic = on |akka.test.timefactor = 10 |akka.loggers = ["akka.event.slf4j.Slf4jLogger"] |akka.loglevel = $logLevel - |akka.stdout-loglevel = "OFF" + |akka.stdout-loglevel = $logLevel |akka.jvm-exit-on-fata-error = off |akka.log-config-on-start = off - |akka.actor.serializers { - | IOReadableWritable = "$ioRWSerializerClass" - |} - |akka.actor.serialization-bindings { - | "$ioRWClass" = IOReadableWritable - |} """.stripMargin } - // scalastyle:off line.size.limit - val getTestingSerializationBindings = - """ - |akka { - | actor { - | kryo{ - | kryo-custom-serializer-init = "org.apache.flink.runtime.testingUtils.KryoTestingInitializer" - | } - | - | serialization-bindings { - | "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$RequestExecutionGraph" = kryo - | "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$ExecutionGraphFound" = kryo - | "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$ExecutionGraphNotFound" = kryo - | "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$WaitForAllVerticesToBeRunning" = kryo - | "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$AllVerticesRunning" = kryo - | "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$NotifyWhenJobRemoved" = kryo - | - | "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$NotifyWhenTaskRemoved" = kryo - | "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$RequestRunningTasks$" = kryo - | "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$ResponseRunningTasks" = kryo - | "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$RequestBroadcastVariablesWithReferences$" = kryo - | "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$ResponseBroadcastVariablesWithReferences" = kryo - | "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$CheckIfJobRemoved" = kryo - | } - | } - |} - """.stripMargin - // scalastyle:on line.size.limit - - - def startTestingTaskManagerWithConfiguration(hostname: String, config: Configuration) + def startTestingTaskManagerWithConfiguration(hostname: String, jobManagerURL: String, + config: Configuration) (implicit system: ActorSystem) = { - val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) = - TaskManager.parseConfiguration(hostname, config) - + val (connectionInfo, _, taskManagerConfig, networkConnectionConfig) = + TaskManager.parseConfiguration(hostname, config, + localAkkaCommunication = true, localTaskManagerCommunication = false) system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) with TestingTaskManager)) } @@ -110,24 +73,39 @@ object TestingUtils { def startTestingTaskManager(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = { val jmURL = jobManager.path.toString val config = new Configuration() - config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, jmURL) - val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) = - TaskManager.parseConfiguration("LOCALHOST", config) + val (connectionInfo, _, taskManagerConfig, networkConnectionConfig) = + TaskManager.parseConfiguration("localhost", config, + localAkkaCommunication = true, localTaskManagerCommunication = true) - system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, + system.actorOf(Props(new TaskManager(connectionInfo, jmURL, taskManagerConfig, networkConnectionConfig) with TestingTaskManager)) } - def startTestingCluster(numSlots: Int, numTMs: Int = 1, timeout: Int = DEFAULT_AKKA_ASK_TIMEOUT): - FlinkMiniCluster = { + def startTestingCluster(numSlots: Int, numTMs: Int = 1, + timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): + TestingCluster = { val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs) config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 1000) - config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, timeout) + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout) new TestingCluster(config) } + def startTestingClusterDeathWatch(numSlots: Int, numTMs: Int, + timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): + TestingCluster = { + val config = new Configuration() + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs) + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout) + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms") + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "50 ms") + config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 1) + + new TestingCluster(config, singleActorSystem = false) + } + def setGlobalExecutionContext(): Unit = { AkkaUtils.globalExecutionContext = ExecutionContext.global } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala index 9740c82..c695988 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala @@ -86,7 +86,7 @@ object ClosureCleaner { private def getInnerClasses(obj: AnyRef): List[Class[_]] = { val seen = Set[Class[_]](obj.getClass) var stack = List[Class[_]](obj.getClass) - while (!stack.isEmpty) { + while (stack.nonEmpty) { val cr = getClassReader(stack.head) stack = stack.tail val set = Set[Class[_]]() http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala index 9969dc0..85d8599 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala @@ -252,7 +252,7 @@ class CoGroupDataSet[L, R]( case ( Left(position), order ) => result.add( new ImmutablePair[java.lang.Integer, Order](position, order)) - case ( Right(expression), order ) => { + case ( Right(expression), order ) => if (!typeInfo.isInstanceOf[CompositeType[_]]) { throw new InvalidProgramException("Specifying order keys via field positions is only " + "valid for composite data types (pojo / tuple / case class)") @@ -265,7 +265,6 @@ class CoGroupDataSet[L, R]( result.add(new ImmutablePair[java.lang.Integer, Order](k, order)) } } - } } result http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala index 122d4ff..d883ba1 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala @@ -137,7 +137,7 @@ private[flink] trait TreeGen[C <: Context] { this: MacroContextHolder[C] with Ty args: List[(String, Type)], ret: Type, impl: Tree): Tree = { - val valParams = args map { case (name, tpe) => + val valParams = args map { case (`name`, tpe) => ValDef(Modifiers(Flag.PARAM), newTermName(name), TypeTree(tpe), EmptyTree) } DefDef(Modifiers(flags), newTermName(name), Nil, List(valParams), TypeTree(ret), impl) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala index 20a41d9..f6630a0 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.io.Writable import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.language.postfixOps import scala.reflect.macros.Context @@ -252,7 +253,7 @@ private[flink] trait TypeInformationGen[C <: Context] { for (field <- traversalClazz.getDeclaredFields) { if (clazzFields.contains(field.getName)) { println(s"The field $field is already contained in the " + - s"hierarchy of the class ${clazz}. Please use unique field names throughout " + + s"hierarchy of the class $clazz. Please use unique field names throughout " + "your class hierarchy") error = true } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index eb2441d..6169af3 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -49,10 +49,9 @@ abstract class CaseClassSerializer[T <: Product]( createInstance(fields) } catch { - case t: Throwable => { + case t: Throwable => instanceCreationFailed = true null.asInstanceOf[T] - } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala index b38764c..6ac5c72 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala @@ -20,6 +20,8 @@ package org.apache.flink.streaming.scala.examples.socket import org.apache.flink.streaming.api.scala._ +import scala.language.postfixOps + /** * This example shows an implementation of WordCount with data from a text socket. * To run the example make sure that the service providing the text data is already up and running. http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala index 1091aa3..714686e 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.scala.windowing.{Delta, Time} import scala.Stream._ import scala.math._ +import scala.language.postfixOps import scala.util.Random /** @@ -78,13 +79,15 @@ object TopSpeedWindowing { numOfCars = args(0).toInt evictionSec = args(1).toInt triggerMeters = args(2).toDouble + true } else { System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>") false } + }else{ + true } - true } var numOfCars = 2 http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala index 0b78365..119862e 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.scala._ import scala.Stream._ import scala.util.Random import java.util.concurrent.TimeUnit +import scala.language.postfixOps object WindowJoin {
