http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 339a813..b16109a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -98,7 +98,7 @@ public class ExecutionGraphConstructionTest {
                
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -136,7 +136,7 @@ public class ExecutionGraphConstructionTest {
                
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -197,7 +197,7 @@ public class ExecutionGraphConstructionTest {
                
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -445,7 +445,7 @@ public class ExecutionGraphConstructionTest {
                
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -495,7 +495,7 @@ public class ExecutionGraphConstructionTest {
                
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                        fail("Attached wrong jobgraph");
@@ -550,7 +550,8 @@ public class ExecutionGraphConstructionTest {
                        
                        List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-                       ExecutionGraph eg = new ExecutionGraph(jobId, jobName, 
cfg, AkkaUtils.DEFAULT_TIMEOUT());
+                       ExecutionGraph eg = new ExecutionGraph(jobId, jobName, 
cfg,
+                                       AkkaUtils.getDefaultTimeout());
                        try {
                                eg.attachJobGraph(ordered);
                        }
@@ -590,7 +591,7 @@ public class ExecutionGraphConstructionTest {
                        List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
 
                        ExecutionGraph eg = new ExecutionGraph(jobId, jobName, 
cfg,
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
 
                        try {
                                eg.attachJobGraph(ordered);
@@ -656,7 +657,7 @@ public class ExecutionGraphConstructionTest {
                        JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, 
v4, v5, v6, v7, v8);
                        
                        ExecutionGraph eg = new ExecutionGraph(jobId, jobName, 
cfg,
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        
eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
                        
                        // check the v1 / v2 co location hints ( assumes 
parallelism(v1) >= parallelism(v2) )

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 4ee28e6..526ba7f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -101,7 +101,7 @@ public class ExecutionGraphDeploymentTest {
                        v4.connectNewDataSetAsInput(v2, 
DistributionPattern.ALL_TO_ALL);
 
                        ExecutionGraph eg = new ExecutionGraph(jobId, "some 
job", new Configuration(),
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
 
                        List<AbstractJobVertex> ordered = Arrays.asList(v1, v2, 
v3, v4);
 
@@ -308,7 +308,7 @@ public class ExecutionGraphDeploymentTest {
 
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new 
Configuration(),
-                               AkkaUtils.DEFAULT_TIMEOUT());
+                               AkkaUtils.getDefaultTimeout());
                eg.setQueuedSchedulingAllowed(false);
 
                List<AbstractJobVertex> ordered = Arrays.asList(v1, v2);

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index e8e1f7e..7d8229c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -151,10 +151,10 @@ public class ExecutionGraphTestUtils {
                ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
                
                ExecutionGraph graph = new ExecutionGraph(new JobID(), "test 
job", new Configuration(),
-                               AkkaUtils.DEFAULT_TIMEOUT());
+                               AkkaUtils.getDefaultTimeout());
                
                ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 
1,
-                               AkkaUtils.DEFAULT_TIMEOUT()));
+                               AkkaUtils.getDefaultTimeout()));
                
                Answer<Void> noop = new Answer<Void>() {
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 2f1af70..2d1d9d5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -66,7 +66,7 @@ public class ExecutionStateProgressTest {
                        
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
                        
                        ExecutionGraph graph = new ExecutionGraph(jid, "test 
job", new Configuration(),
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        graph.attachJobGraph(Arrays.asList(ajv));
                        
                        setGraphStatus(graph, JobStatus.RUNNING);

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index ee89954..168ea90 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -76,7 +76,7 @@ public class ExecutionVertexCancelTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        
@@ -103,7 +103,7 @@ public class ExecutionVertexCancelTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        
                        setVertexState(vertex, ExecutionState.SCHEDULED);
                        assertEquals(ExecutionState.SCHEDULED, 
vertex.getExecutionState());
@@ -137,7 +137,7 @@ public class ExecutionVertexCancelTest {
                                final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
 
                                final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                               AkkaUtils.DEFAULT_TIMEOUT());
+                                               AkkaUtils.getDefaultTimeout());
                                final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
                                setVertexState(vertex, 
ExecutionState.SCHEDULED);
@@ -210,7 +210,7 @@ public class ExecutionVertexCancelTest {
                                        final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
 
                                        final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.DEFAULT_TIMEOUT());
+                                                       
AkkaUtils.getDefaultTimeout());
                                        final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
                                        setVertexState(vertex, 
ExecutionState.SCHEDULED);
@@ -288,7 +288,7 @@ public class ExecutionVertexCancelTest {
                                        final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
 
                                        final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.DEFAULT_TIMEOUT());
+                                                       
AkkaUtils.getDefaultTimeout());
                                        final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
                                        final TestActorRef<? extends Actor> 
taskManager = TestActorRef.create(system,
@@ -336,7 +336,7 @@ public class ExecutionVertexCancelTest {
                                        final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
 
                                        final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.DEFAULT_TIMEOUT());
+                                                       
AkkaUtils.getDefaultTimeout());
                                        final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
                                        final ActorRef taskManager = 
TestActorRef.create(system, Props.create(new
@@ -392,7 +392,7 @@ public class ExecutionVertexCancelTest {
                                        final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
 
                                        final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.DEFAULT_TIMEOUT());
+                                                       
AkkaUtils.getDefaultTimeout());
                                        final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
                                        final ActorRef taskManager = 
TestActorRef.create(system,Props.create(new
@@ -435,7 +435,7 @@ public class ExecutionVertexCancelTest {
                                        final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
 
                                        final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.DEFAULT_TIMEOUT());
+                                                       
AkkaUtils.getDefaultTimeout());
 
                                        final ActorRef taskManager = 
TestActorRef.create(system, Props.create(new
                                                        
CancelSequenceTaskManagerCreator()));
@@ -478,7 +478,7 @@ public class ExecutionVertexCancelTest {
                                        final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
 
                                        final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.DEFAULT_TIMEOUT());
+                                                       
AkkaUtils.getDefaultTimeout());
                                        final ExecutionAttemptID execID = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
                                        final ActorRef taskManager = 
system.actorOf(
@@ -524,7 +524,7 @@ public class ExecutionVertexCancelTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        setVertexState(vertex, ExecutionState.CANCELED);
                        
                        assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
@@ -569,7 +569,7 @@ public class ExecutionVertexCancelTest {
                        // scheduling while canceling is an illegal state 
transition
                        try {
                                ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                               AkkaUtils.DEFAULT_TIMEOUT());
+                                               AkkaUtils.getDefaultTimeout());
                                setVertexState(vertex, 
ExecutionState.CANCELING);
                                
                                Scheduler scheduler = mock(Scheduler.class);
@@ -583,7 +583,7 @@ public class ExecutionVertexCancelTest {
                        // deploying while in canceling state is illegal 
(should immediately go to canceled)
                        try {
                                ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                               AkkaUtils.DEFAULT_TIMEOUT());
+                                               AkkaUtils.getDefaultTimeout());
                                setVertexState(vertex, 
ExecutionState.CANCELING);
                                
                                Instance instance = 
getInstance(ActorRef.noSender());
@@ -598,7 +598,7 @@ public class ExecutionVertexCancelTest {
                        // fail while canceling
                        {
                                ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                               AkkaUtils.DEFAULT_TIMEOUT());
+                                               AkkaUtils.getDefaultTimeout());
                                
                                Instance instance = 
getInstance(ActorRef.noSender());
                                SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index c0d1db8..d8a7db3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -69,7 +69,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        vertex.deployToSlot(slot);
@@ -112,7 +112,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        
@@ -156,7 +156,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        
@@ -211,7 +211,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        
@@ -247,7 +247,7 @@ public class ExecutionVertexDeploymentTest {
 
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        
@@ -295,7 +295,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        vertex.deployToSlot(slot);
@@ -334,7 +334,7 @@ public class ExecutionVertexDeploymentTest {
 
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        final ExecutionAttemptID eid = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
                        final TestActorRef simpleTaskManager = 
TestActorRef.create(system, Props.create(new

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 8230433..66ef4ca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -71,7 +71,7 @@ public class ExecutionVertexSchedulingTest {
                        
                        final ExecutionJobVertex ejv = getExecutionVertex(new 
JobVertexID());
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        
                        Scheduler scheduler = mock(Scheduler.class);
                        
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
@@ -104,7 +104,7 @@ public class ExecutionVertexSchedulingTest {
                        
                        final ExecutionJobVertex ejv = getExecutionVertex(new 
JobVertexID());
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        
                        Scheduler scheduler = mock(Scheduler.class);
                        
when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future);
@@ -140,7 +140,7 @@ public class ExecutionVertexSchedulingTest {
                        
                        final ExecutionJobVertex ejv = getExecutionVertex(new 
JobVertexID());
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        
                        Scheduler scheduler = mock(Scheduler.class);
                        
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index e59a65e..ddd7282 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -56,7 +56,7 @@ public class PointwisePatternTest {
        
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -91,7 +91,7 @@ public class PointwisePatternTest {
        
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -127,7 +127,7 @@ public class PointwisePatternTest {
        
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -164,7 +164,7 @@ public class PointwisePatternTest {
        
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -199,7 +199,7 @@ public class PointwisePatternTest {
        
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -254,7 +254,7 @@ public class PointwisePatternTest {
        
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -300,7 +300,7 @@ public class PointwisePatternTest {
        
                List<AbstractJobVertex> ordered = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.DEFAULT_TIMEOUT());
+               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 7cbd5a8..4f82dea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -69,7 +69,7 @@ public class VertexSlotSharingTest {
                        List<AbstractJobVertex> vertices = new 
ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
                        
                        ExecutionGraph eg = new ExecutionGraph(new JobID(), 
"test job", new Configuration(),
-                                       AkkaUtils.DEFAULT_TIMEOUT());
+                                       AkkaUtils.getDefaultTimeout());
                        eg.attachJobGraph(vertices);
                        
                        // verify that the vertices are all in the same slot 
sharing group

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
deleted file mode 100644
index 94bdef1..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-import java.net.InetAddress;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.junit.Test;
-import org.mockito.Matchers;
-
-public class AllocatedSlotTest {
-
-       @Test
-       public void testStateTransitions() {
-               try {
-                       // cancel, then release
-                       {
-                               SimpleSlot slot = getSlot();
-                               assertTrue(slot.isAlive());
-                               
-                               slot.cancel();
-                               assertFalse(slot.isAlive());
-                               assertTrue(slot.isCanceled());
-                               assertFalse(slot.isReleased());
-                               
-                               slot.releaseSlot();
-                               assertFalse(slot.isAlive());
-                               assertTrue(slot.isCanceled());
-                               assertTrue(slot.isReleased());
-                       }
-                       
-                       // release immediately
-                       {
-                               SimpleSlot slot = getSlot();
-                               assertTrue(slot.isAlive());
-                               
-                               slot.releaseSlot();
-                               assertFalse(slot.isAlive());
-                               assertTrue(slot.isCanceled());
-                               assertTrue(slot.isReleased());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testSetExecutionVertex() {
-               try {
-                       Execution ev = mock(Execution.class);
-                       Execution ev_2 = mock(Execution.class);
-                       
-                       // assign to alive slot
-                       {
-                               SimpleSlot slot = getSlot();
-                               
-                               assertTrue(slot.setExecutedVertex(ev));
-                               assertEquals(ev, slot.getExecution());
-                               
-                               // try to add another one
-                               assertFalse(slot.setExecutedVertex(ev_2));
-                               assertEquals(ev, slot.getExecution());
-                       }
-                       
-                       // assign to canceled slot
-                       {
-                               SimpleSlot slot = getSlot();
-                               slot.cancel();
-                               
-                               assertFalse(slot.setExecutedVertex(ev));
-                               assertNull(slot.getExecution());
-                       }
-                       
-                       // assign to released
-                       {
-                               SimpleSlot slot = getSlot();
-                               slot.releaseSlot();
-                               
-                               assertFalse(slot.setExecutedVertex(ev));
-                               assertNull(slot.getExecution());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testReleaseCancelsVertex() {
-               try {
-                       Execution ev = mock(Execution.class);
-                       
-                       SimpleSlot slot = getSlot();
-                       assertTrue(slot.setExecutedVertex(ev));
-                       assertEquals(ev, slot.getExecution());
-                       
-                       slot.cancel();
-                       slot.releaseSlot();
-                       slot.cancel();
-                       
-                       verify(ev, 
times(1)).fail(Matchers.any(Throwable.class));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       public static SimpleSlot getSlot() throws Exception {
-               HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
-               InetAddress address = InetAddress.getByName("127.0.0.1");
-               InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
-               
-               Instance instance = new Instance(ActorRef.noSender(), 
connection, new InstanceID(), hardwareDescription, 1);
-               return instance.allocateSimpleSlot(new JobID());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
new file mode 100644
index 0000000..b16bf4b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+import java.net.InetAddress;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+public class SimpleSlotTest {
+
+       @Test
+       public void testStateTransitions() {
+               try {
+                       // cancel, then release
+                       {
+                               SimpleSlot slot = getSlot();
+                               assertTrue(slot.isAlive());
+                               
+                               slot.cancel();
+                               assertFalse(slot.isAlive());
+                               assertTrue(slot.isCanceled());
+                               assertFalse(slot.isReleased());
+                               
+                               slot.releaseSlot();
+                               assertFalse(slot.isAlive());
+                               assertTrue(slot.isCanceled());
+                               assertTrue(slot.isReleased());
+                       }
+                       
+                       // release immediately
+                       {
+                               SimpleSlot slot = getSlot();
+                               assertTrue(slot.isAlive());
+                               
+                               slot.releaseSlot();
+                               assertFalse(slot.isAlive());
+                               assertTrue(slot.isCanceled());
+                               assertTrue(slot.isReleased());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testSetExecutionVertex() {
+               try {
+                       Execution ev = mock(Execution.class);
+                       Execution ev_2 = mock(Execution.class);
+                       
+                       // assign to alive slot
+                       {
+                               SimpleSlot slot = getSlot();
+                               
+                               assertTrue(slot.setExecutedVertex(ev));
+                               assertEquals(ev, slot.getExecution());
+                               
+                               // try to add another one
+                               assertFalse(slot.setExecutedVertex(ev_2));
+                               assertEquals(ev, slot.getExecution());
+                       }
+                       
+                       // assign to canceled slot
+                       {
+                               SimpleSlot slot = getSlot();
+                               slot.cancel();
+                               
+                               assertFalse(slot.setExecutedVertex(ev));
+                               assertNull(slot.getExecution());
+                       }
+                       
+                       // assign to released
+                       {
+                               SimpleSlot slot = getSlot();
+                               slot.releaseSlot();
+                               
+                               assertFalse(slot.setExecutedVertex(ev));
+                               assertNull(slot.getExecution());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testReleaseCancelsVertex() {
+               try {
+                       Execution ev = mock(Execution.class);
+                       
+                       SimpleSlot slot = getSlot();
+                       assertTrue(slot.setExecutedVertex(ev));
+                       assertEquals(ev, slot.getExecution());
+                       
+                       slot.cancel();
+                       slot.releaseSlot();
+                       slot.cancel();
+                       
+                       verify(ev, 
times(1)).fail(Matchers.any(Throwable.class));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       public static SimpleSlot getSlot() throws Exception {
+               HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
+               InetAddress address = InetAddress.getByName("127.0.0.1");
+               InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
+               
+               Instance instance = new Instance(ActorRef.noSender(), 
connection, new InstanceID(), hardwareDescription, 1);
+               return instance.allocateSimpleSlot(new JobID());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 60ede47..d55a67b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -538,10 +538,10 @@ public class TaskManagerTest {
                Configuration cfg = new Configuration();
                cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
10);
                GlobalConfiguration.includeConfiguration(cfg);
-               String akkaURL = jm.path().toString();
-               cfg.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, akkaURL);
+               String jobManagerURL = jm.path().toString();
 
-               ActorRef taskManager = 
TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system);
+               ActorRef taskManager = 
TestingUtils.startTestingTaskManagerWithConfiguration("localhost",
+                               jobManagerURL, cfg, system);
 
                Future<Object> response = Patterns.ask(taskManager, 
                                
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 89d1c4e..007ad5e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -47,7 +47,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -61,10 +64,10 @@ public class TaskTest {
                        final JobVertexID vid = new JobVertexID();
                        final ExecutionAttemptID eid = new ExecutionAttemptID();
                        
-                       final TaskManager taskManager = mock(TaskManager.class);
                        final RuntimeEnvironment env = 
mock(RuntimeEnvironment.class);
                        
-                       Task task = new Task(jid, vid, 2, 7, eid, "TestTask", 
taskManager);
+                       Task task = spy(new Task(jid, vid, 2, 7, eid, 
"TestTask", ActorRef.noSender()));
+                       
doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), 
any(Throwable.class));
                        task.setEnvironment(env);
                        
                        assertEquals(ExecutionState.DEPLOYING, 
task.getExecutionState());
@@ -84,7 +87,7 @@ public class TaskTest {
                        task.markFailed(new Exception("test"));
                        assertTrue(ExecutionState.CANCELED == 
task.getExecutionState());
                        
-                       verify(taskManager, 
times(1)).notifyExecutionStateChange(jid, eid, ExecutionState.CANCELED, null);
+                       verify(task, 
times(1)).notifyExecutionStateChange(ExecutionState.CANCELED, null);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -99,10 +102,8 @@ public class TaskTest {
                        final JobVertexID vid = new JobVertexID();
                        final ExecutionAttemptID eid = new ExecutionAttemptID();
                        
-                       final TaskManager taskManager = mock(TaskManager.class);
-                       
-                       
-                       final Task task = new Task(jid, vid, 2, 7, eid, 
"TestTask", taskManager);
+                       final Task task = spy(new Task(jid, vid, 2, 7, eid, 
"TestTask", ActorRef.noSender()));
+                       
doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), 
any(Throwable.class));
                        
                        final AtomicReference<Throwable> error = new 
AtomicReference<Throwable>();
                        
@@ -135,7 +136,7 @@ public class TaskTest {
                        
                        assertEquals(ExecutionState.FINISHED, 
task.getExecutionState());
                        
-                       verify(taskManager).notifyExecutionStateChange(jid, 
eid, ExecutionState.FINISHED, null);
+                       
verify(task).notifyExecutionStateChange(ExecutionState.FINISHED, null);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -150,9 +151,8 @@ public class TaskTest {
                        final JobVertexID vid = new JobVertexID();
                        final ExecutionAttemptID eid = new ExecutionAttemptID();
                        
-                       final TaskManager taskManager = mock(TaskManager.class);
-                       
-                       final Task task = new Task(jid, vid, 2, 7, eid, 
"TestTask", taskManager);
+                       final Task task = spy(new Task(jid, vid, 2, 7, eid, 
"TestTask", ActorRef.noSender()));
+                       
doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), 
any(Throwable.class));
                        
                        final AtomicReference<Throwable> error = new 
AtomicReference<Throwable>();
                        
@@ -185,7 +185,7 @@ public class TaskTest {
                        
                        // make sure the final state is correct and the task 
manager knows the changes
                        assertEquals(ExecutionState.FAILED, 
task.getExecutionState());
-                       
verify(taskManager).notifyExecutionStateChange(Matchers.eq(jid), 
Matchers.eq(eid), Matchers.eq(ExecutionState.FAILED), 
Matchers.any(Throwable.class));
+                       
verify(task).notifyExecutionStateChange(Matchers.eq(ExecutionState.FAILED), 
any(Throwable.class));
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -199,10 +199,9 @@ public class TaskTest {
                        final JobID jid = new JobID();
                        final JobVertexID vid = new JobVertexID();
                        final ExecutionAttemptID eid = new ExecutionAttemptID();
-                       
-                       final TaskManager taskManager = mock(TaskManager.class);
-                       
-                       final Task task = new Task(jid, vid, 2, 7, eid, 
"TestTask", taskManager);
+
+                       final Task task = spy(new Task(jid, vid, 2, 7, eid, 
"TestTask", ActorRef.noSender()));
+                       
doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), 
any(Throwable.class));
                        
                        final AtomicReference<Throwable> error = new 
AtomicReference<Throwable>();
                        
@@ -246,7 +245,7 @@ public class TaskTest {
                        
                        // make sure the final state is correct and the task 
manager knows the changes
                        assertEquals(ExecutionState.CANCELED, 
task.getExecutionState());
-                       verify(taskManager).notifyExecutionStateChange(jid, 
eid, ExecutionState.CANCELED, null);
+                       
verify(task).notifyExecutionStateChange(ExecutionState.CANCELED, null);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -261,15 +260,14 @@ public class TaskTest {
                        final JobVertexID vid = new JobVertexID();
                        final ExecutionAttemptID eid = new ExecutionAttemptID();
                        
-                       final TaskManager taskManager = mock(TaskManager.class);
-                       
                        TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
                                        new Configuration(), new 
Configuration(), TestInvokableCorrect.class.getName(),
                                        
Collections.<PartitionDeploymentDescriptor>emptyList(),
                                        
Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
                                        new ArrayList<BlobKey>(), 0);
                        
-                       Task task = new Task(jid, vid, 2, 7, eid, "TestTask", 
taskManager);
+                       Task task = spy(new Task(jid, vid, 2, 7, eid, 
"TestTask", ActorRef.noSender()));
+                       
doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), 
any(Throwable.class));
                        
                        RuntimeEnvironment env = new 
RuntimeEnvironment(mock(ActorRef.class), task, tdd, getClass().getClassLoader(),
                                        mock(MemoryManager.class), 
mock(IOManager.class), mock(InputSplitProvider.class),
@@ -284,7 +282,7 @@ public class TaskTest {
                        
                        assertEquals(ExecutionState.FINISHED, 
task.getExecutionState());
                        
-                       verify(taskManager).notifyExecutionStateChange(jid, 
eid, ExecutionState.FINISHED, null);
+                       
verify(task).notifyExecutionStateChange(ExecutionState.FINISHED, null);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -299,15 +297,14 @@ public class TaskTest {
                        final JobVertexID vid = new JobVertexID();
                        final ExecutionAttemptID eid = new ExecutionAttemptID();
                        
-                       final TaskManager taskManager = mock(TaskManager.class);
-                       
                        TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
                                        new Configuration(), new 
Configuration(), TestInvokableWithException.class.getName(),
                                        
Collections.<PartitionDeploymentDescriptor>emptyList(),
                                        
Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
                                        new ArrayList<BlobKey>(), 0);
                        
-                       Task task = new Task(jid, vid, 2, 7, eid, "TestTask", 
taskManager);
+                       Task task = spy(new Task(jid, vid, 2, 7, eid, 
"TestTask", ActorRef.noSender()));
+                       
doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), 
any(Throwable.class));
                        
                        RuntimeEnvironment env = new 
RuntimeEnvironment(mock(ActorRef.class), task, tdd, getClass().getClassLoader(),
                                        mock(MemoryManager.class), 
mock(IOManager.class), mock(InputSplitProvider.class),
@@ -322,10 +319,10 @@ public class TaskTest {
                        
                        assertEquals(ExecutionState.FAILED, 
task.getExecutionState());
                        
-                       
verify(taskManager).notifyExecutionStateChange(Matchers.eq(jid), 
Matchers.eq(eid), Matchers.eq(ExecutionState.FAILED), 
Matchers.any(Throwable.class));
-                       verify(taskManager, 
times(0)).notifyExecutionStateChange(jid, eid, ExecutionState.CANCELING, null);
-                       verify(taskManager, 
times(0)).notifyExecutionStateChange(jid, eid, ExecutionState.CANCELED, null);
-                       verify(taskManager, 
times(0)).notifyExecutionStateChange(jid, eid, ExecutionState.FINISHED, null);
+                       
verify(task).notifyExecutionStateChange(Matchers.eq(ExecutionState.FAILED), 
any(Throwable.class));
+                       verify(task, 
times(0)).notifyExecutionStateChange(ExecutionState.CANCELING, null);
+                       verify(task, 
times(0)).notifyExecutionStateChange(ExecutionState.CANCELED, null);
+                       verify(task, 
times(0)).notifyExecutionStateChange(ExecutionState.FINISHED, null);
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index 01f3df8..775c0cb 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.executiongraph
 
 import akka.actor.{Props, ActorSystem}
-import akka.testkit.{TestKit}
+import akka.testkit.TestKit
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, 
AbstractJobVertex}
@@ -56,7 +56,7 @@ with Matchers with BeforeAndAfterAll {
         val jobGraph = new JobGraph("Pointwise job", sender)
 
         val eg = new ExecutionGraph(new JobID(), "test job", new 
Configuration(),
-          AkkaUtils.DEFAULT_TIMEOUT)
+          AkkaUtils.getDefaultTimeout)
         eg.setNumberOfRetriesLeft(0)
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
 
@@ -100,7 +100,7 @@ with Matchers with BeforeAndAfterAll {
         val jobGraph = new JobGraph("Pointwise job", sender)
 
         val eg = new ExecutionGraph(new JobID(), "Test job", new 
Configuration(),
-          AkkaUtils.DEFAULT_TIMEOUT)
+          AkkaUtils.getDefaultTimeout)
         eg.setNumberOfRetriesLeft(1)
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 2c04e5c..69b0588 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -59,7 +59,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         val jobGraph = new JobGraph("Pointwise job", sender)
 
         val eg = new ExecutionGraph(new JobID(), "test job", new 
Configuration(),
-          AkkaUtils.DEFAULT_TIMEOUT)
+          AkkaUtils.getDefaultTimeout)
         eg.setNumberOfRetriesLeft(0)
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index fd52f5e..2104034 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -49,7 +49,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with WrapA
       val receiver = new AbstractJobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Sender])
-      receiver.setInvokableClass((classOf[Receiver]))
+      receiver.setInvokableClass(classOf[Receiver])
 
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerFailsITCase.scala
new file mode 100644
index 0000000..88c604b
--- /dev/null
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import akka.actor.{PoisonPill, ActorSystem}
+import akka.testkit.{ImplicitSender, TestKit}
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager
+import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated,
+NotifyWhenJobManagerTerminated}
+import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.junit.runner.RunWith
+import org.scalatest.{WordSpecLike, Matchers, BeforeAndAfterAll}
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class JobManagerFailsITCase(_system: ActorSystem) extends TestKit(_system) 
with ImplicitSender
+with WordSpecLike with Matchers with BeforeAndAfterAll {
+
+  def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The TaskManager" should {
+    "detect a lost connection to the JobManager and try to reconnect to it" in 
{
+      val num_slots = 11
+
+      val cluster = TestingUtils.startTestingClusterDeathWatch(num_slots, 1)
+
+      val tm = cluster.getTaskManagers(0)
+      val jm = cluster.getJobManager
+
+      try{
+        jm ! RequestNumberRegisteredTaskManager
+        expectMsg(1)
+
+        tm ! NotifyWhenJobManagerTerminated(jm)
+
+        jm ! PoisonPill
+
+        expectMsgClass(classOf[JobManagerTerminated])
+
+        cluster.restartJobManager()
+
+        cluster.waitForTaskManagersToBeRegistered()
+
+        cluster.getJobManager ! RequestNumberRegisteredTaskManager
+
+        expectMsg(1)
+      }finally{
+        cluster.stop()
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 9bcd163..c2ceac3 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph, ScheduleMode}
+import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.testingUtils.TestingUtils
@@ -32,6 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers, 
WordSpecLike}
 import scheduler.{NoResourceAvailableException, SlotSharingGroup}
 
 import scala.concurrent.duration._
+import scala.language.postfixOps
 
 @RunWith(classOf[JUnitRunner])
 class JobManagerITCase(_system: ActorSystem) extends TestKit(_system) with 
ImplicitSender with
@@ -40,7 +42,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
   def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
 
-  override def afterAll: Unit = {
+  override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index e20471d..a7be14b 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -37,7 +37,7 @@ class RecoveryITCase(_system: ActorSystem) extends 
TestKit(_system) with Implici
 WordSpecLike with Matchers with BeforeAndAfterAll {
   def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
 
-  override def afterAll: Unit = {
+  override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index 57a49cf..289f759 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -34,7 +34,7 @@ class SlotSharingITCase(_system: ActorSystem) extends 
TestKit(_system) with Impl
 WordSpecLike with Matchers with BeforeAndAfterAll {
   def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
 
-  override def afterAll: Unit = {
+  override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
index 22ee2c1..8d4a9ba 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
@@ -22,9 +22,9 @@ import akka.actor.{Kill, ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, 
SubmissionSuccess, SubmitJob}
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{WorkingTaskManager,
-RequestWorkingTaskManager, AllVerticesRunning, 
WaitForAllVerticesToBeRunningOrFinished}
+import 
org.apache.flink.runtime.messages.JobManagerMessages.{RequestNumberRegisteredTaskManager,
+JobResultFailed, SubmissionSuccess, SubmitJob}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -36,11 +36,40 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 
   def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
 
-  override def afterAll(): Unit ={
+  override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system)
   }
 
   "The JobManager" should {
+
+    "detect a failing task manager" in {
+      val num_slots = 11
+
+      val cluster = TestingUtils.startTestingClusterDeathWatch(num_slots, 2)
+
+      val taskManagers = cluster.getTaskManagers
+      val jm = cluster.getJobManager
+
+      try{
+        within(TestingUtils.TESTING_DURATION){
+          jm ! RequestNumberRegisteredTaskManager
+          expectMsg(2)
+
+          jm ! NotifyWhenTaskManagerTerminated(taskManagers(0))
+
+          taskManagers(0) ! PoisonPill
+
+          val TaskManagerTerminated(tm) = 
expectMsgClass(classOf[TaskManagerTerminated])
+
+          jm ! RequestNumberRegisteredTaskManager
+          expectMsg(1)
+        }
+      }finally{
+        cluster.stop()
+      }
+
+    }
+
     "handle gracefully failing task manager" in {
       val num_tasks = 31
       val sender = new AbstractJobVertex("Sender")

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
index bd542b7..ec41141 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
@@ -31,6 +31,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 import scala.concurrent.duration._
 
+import scala.language.postfixOps
+
 class TaskManagerRegistrationITCase(_system: ActorSystem) extends 
TestKit(_system) with
 ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
 
@@ -108,10 +110,10 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
     "shutdown after the maximum registration duration has been exceeded" in {
 
       val config = new Configuration()
-      config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, 
self.path.toString)
       config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, 
"1 second")
 
-      val tm = 
TestingUtils.startTestingTaskManagerWithConfiguration("LOCALHOST", config)
+      val tm = 
TestingUtils.startTestingTaskManagerWithConfiguration("LOCALHOST",
+        self.path.toString, config)
 
       watch(tm)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index c2fd543..70f085a 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -99,7 +99,7 @@ object Tasks {
     var reader: RecordReader[IntegerRecord] = _
 
     override def registerInputOutput(): Unit = {
-      val env = getEnvironment()
+      val env = getEnvironment
 
       reader = new RecordReader[IntegerRecord](env.getReader(0), 
classOf[IntegerRecord])
     }
@@ -207,7 +207,7 @@ object Tasks {
     var writer: RecordWriter[IntegerRecord] = _
 
     override def registerInputOutput(): Unit = {
-      writer = new RecordWriter[IntegerRecord](getEnvironment().getWriter(0))
+      writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
     }
 
     override def invoke(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index dbed4ff..4b6d2a3 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -25,8 +25,16 @@ import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
 
-class TestingCluster(userConfiguration: Configuration) extends 
FlinkMiniCluster(userConfiguration,
-  true) {
+/**
+ * Testing cluster which starts the [[JobManager]] and [[TaskManager]] actors 
with testing support
+ * in the same [[ActorSystem]].
+ *
+ * @param userConfiguration Configuration object with the user provided 
configuration values
+ * @param singleActorSystem true if all actors shall be running in the same 
[[ActorSystem]],
+ *                          otherwise false
+ */
+class TestingCluster(userConfiguration: Configuration, singleActorSystem: 
Boolean = true) extends
+FlinkMiniCluster(userConfiguration, singleActorSystem) {
 
   override def generateConfiguration(userConfig: Configuration): Configuration 
= {
     val cfg = new Configuration()
@@ -45,9 +53,15 @@ class TestingCluster(userConfiguration: Configuration) 
extends FlinkMiniCluster(
 
   override def startTaskManager(index: Int)(implicit system: ActorSystem) = {
     val (connectionInfo, jobManagerURL, taskManagerConfig, 
networkConnectionConfig) =
-      TaskManager.parseConfiguration(HOSTNAME, configuration, true)
+      TaskManager.parseConfiguration(HOSTNAME, configuration,
+        localAkkaCommunication = singleActorSystem, 
localTaskManagerCommunication = true)
 
     system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, 
taskManagerConfig,
       networkConnectionConfig) with TestingTaskManager), 
TaskManager.TASK_MANAGER_NAME + index)
   }
+
+  def restartJobManager(): Unit = {
+    jobManagerActorSystem.stop(jobManagerActor)
+    jobManagerActor = startJobManager(jobManagerActorSystem)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index ad85547..0d7ce60 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.{Cancellable, ActorRef, Props}
+import akka.actor.{Cancellable, Terminated, ActorRef, Props}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.execution.ExecutionState
@@ -31,13 +31,18 @@ import scala.collection.convert.WrapAsScala
 import scala.concurrent.Future
 import scala.concurrent.duration._
 
+import scala.language.postfixOps
 
+/**
+ * Mixin for [[TestingJobManager]] to support testing messages
+ */
 trait TestingJobManager extends ActorLogMessages with WrapAsScala {
   that: JobManager =>
 
   import context._
 
   val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, 
Set[ActorRef]]()
+  val waitForTaskManagerToBeTerminated = 
scala.collection.mutable.HashMap[String, Set[ActorRef]]()
 
   val waitForAllVerticesToBeRunningOrFinished =
     scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
@@ -96,7 +101,8 @@ trait TestingJobManager extends ActorLogMessages with 
WrapAsScala {
         periodicCheck = None
       }
 
-    case NotifyWhenJobRemoved(jobID) => {
+
+    case NotifyWhenJobRemoved(jobID) =>
       val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager)
 
       val responses = tms.map{
@@ -107,8 +113,18 @@ trait TestingJobManager extends ActorLogMessages with 
WrapAsScala {
       import context.dispatcher
 
       Future.fold(responses)(true)(_ & _) pipeTo sender
-    }
+    case NotifyWhenTaskManagerTerminated(taskManager) =>
+      val waiting = 
waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
+      waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + 
sender)
+    case msg@Terminated(taskManager) =>
+      super.receiveWithLogMessages(msg)
 
+      waitForTaskManagerToBeTerminated.get(taskManager.path.name) foreach {
+        _ foreach {
+          listener =>
+            listener ! TaskManagerTerminated(taskManager)
+        }
+      }
     case RequestWorkingTaskManager(jobID) =>
       currentJobs.get(jobID) match {
         case Some((eg, _)) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index c59efdc..a5a577b 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -48,4 +48,7 @@ object TestingJobManagerMessages {
   case class JobStatusIs(jobID: JobID, state: JobStatus)
 
   case object NotifyListeners
+
+  case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
+  case class TaskManagerTerminated(taskManager: ActorRef)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index d3a3526..88d3cd0 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -22,6 +22,9 @@ import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.jobmanager.MemoryArchivist
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphNotFound,
 ExecutionGraphFound, RequestExecutionGraph}
 
+/**
+ * Mixin for the [[MemoryArchivist]] to support testing messages
+ */
 trait TestingMemoryArchivist extends ActorLogMessages {
   self: MemoryArchivist =>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index f1fe3e5..abdb173 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -18,22 +18,28 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.ActorRef
+import akka.actor.{Terminated, ActorRef}
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
 import org.apache.flink.runtime.taskmanager.TaskManager
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
 
 import scala.concurrent.duration._
+import scala.language.postfixOps
 
+/**
+ * Mixin for the [[TaskManager]] to support testing messages
+ */
 trait TestingTaskManager extends ActorLogMessages {
   that: TaskManager =>
 
   val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, 
Set[ActorRef]]()
   val waitForJobRemoval = scala.collection.mutable.HashMap[JobID, 
Set[ActorRef]]()
+  val waitForJobManagerToBeTerminated = 
scala.collection.mutable.HashMap[String, Set[ActorRef]]()
 
   abstract override def receiveWithLogMessages = {
     receiveTestMessages orElse super.receiveWithLogMessages
@@ -59,21 +65,19 @@ trait TestingTaskManager extends ActorLogMessages {
         case None =>
       }
       
-    case RequestBroadcastVariablesWithReferences => {
+    case RequestBroadcastVariablesWithReferences =>
       sender ! ResponseBroadcastVariablesWithReferences(
         bcVarManager.getNumberOfVariablesWithReferences)
-    }
 
-    case RequestNumActiveConnections => {
+    case RequestNumActiveConnections =>
       networkEnvironment match {
         case Some(ne) => sender ! ResponseNumActiveConnections(
           ne.getConnectionManager.getNumberOfActiveConnections)
 
         case None => sender ! ResponseNumActiveConnections(0)
       }
-    }
 
-    case NotifyWhenJobRemoved(jobID) => {
+  case NotifyWhenJobRemoved(jobID) =>
       if(runningTasks.values.exists(_.getJobID == jobID)){
         val set = waitForJobRemoval.getOrElse(jobID, Set())
         waitForJobRemoval += (jobID -> (set + sender))
@@ -85,9 +89,8 @@ trait TestingTaskManager extends ActorLogMessages {
           case None => sender ! true
         }
       }
-    }
-    
-    case CheckIfJobRemoved(jobID) => {
+
+    case CheckIfJobRemoved(jobID) =>
       if(runningTasks.values.forall(_.getJobID != jobID)){
         waitForJobRemoval.get(jobID) match {
           case Some(listeners) => listeners foreach (_ ! true)
@@ -97,6 +100,18 @@ trait TestingTaskManager extends ActorLogMessages {
         import context.dispatcher
         context.system.scheduler.scheduleOnce(200 milliseconds, this.self, 
CheckIfJobRemoved(jobID))
       }
-    }
+
+    case NotifyWhenJobManagerTerminated(jobManager) =>
+      val waiting = 
waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
+      waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + 
sender)
+
+    case msg@Terminated(jobManager) =>
+      super.receiveWithLogMessages(msg)
+
+      waitForJobManagerToBeTerminated.get(jobManager.path.name) foreach {
+        _ foreach {
+          _ ! JobManagerTerminated(jobManager)
+        }
+      }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index bbbf3d2..ebe4555 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.testingUtils
 
+import akka.actor.ActorRef
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.taskmanager.Task
@@ -44,16 +45,20 @@ object TestingTaskManagerMessages {
   case object RequestRunningTasks
   
   case object RequestBroadcastVariablesWithReferences
+
+  case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
+
+  case class JobManagerTerminated(jobManager: ActorRef)
   
   // --------------------------------------------------------------------------
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
   
-  def getRequestRunningTasksMessage() : AnyRef = {
+  def getRequestRunningTasksMessage: AnyRef = {
     RequestRunningTasks
   }
   
-  def getRequestBroadcastVariablesWithReferencesMessage() : AnyRef = {
+  def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = {
     RequestBroadcastVariablesWithReferences
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 5fd133f..a9d8f51 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -22,81 +22,44 @@ import akka.actor.{ActorRef, Props, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
 import com.typesafe.config.ConfigFactory
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.core.io.IOReadableWritable
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.akka.serialization.IOReadableWritableSerializer
 import 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
 import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import scala.concurrent.duration._
 
 import scala.concurrent.ExecutionContext
+import scala.language.postfixOps
 
+/**
+ * Convenience functions to test actor based components.
+ */
 object TestingUtils {
   val testConfig = 
ConfigFactory.parseString(getDefaultTestingActorSystemConfigString)
 
   val TESTING_DURATION = 2 minute
 
-  val DEFAULT_AKKA_ASK_TIMEOUT = 1000
+  val DEFAULT_AKKA_ASK_TIMEOUT = "200 s"
 
   def getDefaultTestingActorSystemConfigString: String = {
-    val ioRWSerializerClass = 
classOf[IOReadableWritableSerializer].getCanonicalName
-    val ioRWClass = classOf[IOReadableWritable].getCanonicalName
-
     val logLevel = AkkaUtils.getLogLevel
 
     s"""akka.daemonic = on
       |akka.test.timefactor = 10
       |akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
       |akka.loglevel = $logLevel
-      |akka.stdout-loglevel = "OFF"
+      |akka.stdout-loglevel = $logLevel
       |akka.jvm-exit-on-fata-error = off
       |akka.log-config-on-start = off
-      |akka.actor.serializers {
-      | IOReadableWritable = "$ioRWSerializerClass"
-      |}
-      |akka.actor.serialization-bindings {
-      | "$ioRWClass" = IOReadableWritable
-      |}
     """.stripMargin
   }
 
-  // scalastyle:off line.size.limit
-  val getTestingSerializationBindings =
-  """
-    |akka {
-    |  actor {
-    |    kryo{
-    |      kryo-custom-serializer-init = 
"org.apache.flink.runtime.testingUtils.KryoTestingInitializer"
-    |    }
-    |
-    |    serialization-bindings {
-    |      
"org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$RequestExecutionGraph"
 = kryo
-    |      
"org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$ExecutionGraphFound"
 = kryo
-    |      
"org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$ExecutionGraphNotFound"
 = kryo
-    |      
"org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$WaitForAllVerticesToBeRunning"
 = kryo
-    |      
"org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$AllVerticesRunning"
 = kryo
-    |      
"org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$NotifyWhenJobRemoved"
 = kryo
-    |
-    |      
"org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$NotifyWhenTaskRemoved"
 = kryo
-    |      
"org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$RequestRunningTasks$"
 = kryo
-    |      
"org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$ResponseRunningTasks"
 = kryo
-    |      
"org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$RequestBroadcastVariablesWithReferences$"
 = kryo
-    |      
"org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$ResponseBroadcastVariablesWithReferences"
 = kryo
-    |      
"org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$CheckIfJobRemoved"
 = kryo
-    |    }
-    |  }
-    |}
-  """.stripMargin
-  // scalastyle:on line.size.limit
-
-
-  def startTestingTaskManagerWithConfiguration(hostname: String, config: 
Configuration)
+  def startTestingTaskManagerWithConfiguration(hostname: String, 
jobManagerURL: String,
+                                               config: Configuration)
                                               (implicit system: ActorSystem) = 
{
-    val (connectionInfo, jobManagerURL, taskManagerConfig, 
networkConnectionConfig) =
-      TaskManager.parseConfiguration(hostname, config)
-
+    val (connectionInfo, _, taskManagerConfig, networkConnectionConfig) =
+      TaskManager.parseConfiguration(hostname, config,
+        localAkkaCommunication = true, localTaskManagerCommunication = false)
     system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, 
taskManagerConfig,
       networkConnectionConfig) with TestingTaskManager))
   }
@@ -110,24 +73,39 @@ object TestingUtils {
   def startTestingTaskManager(jobManager: ActorRef)(implicit system: 
ActorSystem): ActorRef = {
     val jmURL = jobManager.path.toString
     val config = new Configuration()
-    config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, jmURL)
-    val (connectionInfo, jobManagerURL, taskManagerConfig, 
networkConnectionConfig) =
-      TaskManager.parseConfiguration("LOCALHOST", config)
+    val (connectionInfo, _, taskManagerConfig, networkConnectionConfig) =
+      TaskManager.parseConfiguration("localhost", config,
+        localAkkaCommunication = true, localTaskManagerCommunication = true)
 
-    system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, 
taskManagerConfig,
+    system.actorOf(Props(new TaskManager(connectionInfo, jmURL, 
taskManagerConfig,
       networkConnectionConfig) with TestingTaskManager))
   }
 
-  def startTestingCluster(numSlots: Int, numTMs: Int = 1, timeout: Int = 
DEFAULT_AKKA_ASK_TIMEOUT):
-  FlinkMiniCluster = {
+  def startTestingCluster(numSlots: Int, numTMs: Int = 1,
+                          timeout: String = DEFAULT_AKKA_ASK_TIMEOUT):
+  TestingCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTMs)
     
config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 
1000)
-    config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
+    config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
     new TestingCluster(config)
   }
 
+  def startTestingClusterDeathWatch(numSlots: Int, numTMs: Int,
+                                            timeout: String = 
DEFAULT_AKKA_ASK_TIMEOUT):
+  TestingCluster = {
+    val config = new Configuration()
+    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTMs)
+    config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
+    config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms")
+    config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "50 ms")
+    config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 1)
+
+    new TestingCluster(config, singleActorSystem = false)
+  }
+
   def setGlobalExecutionContext(): Unit = {
     AkkaUtils.globalExecutionContext = ExecutionContext.global
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
index 9740c82..c695988 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
@@ -86,7 +86,7 @@ object ClosureCleaner {
   private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
     val seen = Set[Class[_]](obj.getClass)
     var stack = List[Class[_]](obj.getClass)
-    while (!stack.isEmpty) {
+    while (stack.nonEmpty) {
       val cr = getClassReader(stack.head)
       stack = stack.tail
       val set = Set[Class[_]]()

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
index 9969dc0..85d8599 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
@@ -252,7 +252,7 @@ class CoGroupDataSet[L, R](
         case ( Left(position), order )  => result.add(
                                       new ImmutablePair[java.lang.Integer, 
Order](position, order))
         
-        case ( Right(expression), order ) => {
+        case ( Right(expression), order ) =>
           if (!typeInfo.isInstanceOf[CompositeType[_]]) {
             throw new InvalidProgramException("Specifying order keys via field 
positions is only "
                                    + "valid for composite data types (pojo / 
tuple / case class)")
@@ -265,7 +265,6 @@ class CoGroupDataSet[L, R](
               result.add(new ImmutablePair[java.lang.Integer, Order](k, order))
             }
           }
-        }
       }
       
       result

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala
index 122d4ff..d883ba1 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TreeGen.scala
@@ -137,7 +137,7 @@ private[flink] trait TreeGen[C <: Context] { this: 
MacroContextHolder[C] with Ty
         args: List[(String, Type)],
         ret: Type,
         impl: Tree): Tree = {
-      val valParams = args map { case (name, tpe) =>
+      val valParams = args map { case (`name`, tpe) =>
         ValDef(Modifiers(Flag.PARAM), newTermName(name), TypeTree(tpe), 
EmptyTree)
       }
       DefDef(Modifiers(flags), newTermName(name), Nil, List(valParams), 
TypeTree(ret), impl)

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index 20a41d9..f6630a0 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.io.Writable
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.language.postfixOps
 
 import scala.reflect.macros.Context
 
@@ -252,7 +253,7 @@ private[flink] trait TypeInformationGen[C <: Context] {
         for (field <- traversalClazz.getDeclaredFields) {
           if (clazzFields.contains(field.getName)) {
             println(s"The field $field is already contained in the " +
-              s"hierarchy of the class ${clazz}. Please use unique field names 
throughout " +
+              s"hierarchy of the class $clazz. Please use unique field names 
throughout " +
               "your class hierarchy")
             error = true
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index eb2441d..6169af3 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -49,10 +49,9 @@ abstract class CaseClassSerializer[T <: Product](
         createInstance(fields)
       }
       catch {
-        case t: Throwable => {
+        case t: Throwable =>
           instanceCreationFailed = true
           null.asInstanceOf[T]
-        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
index b38764c..6ac5c72 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.scala.examples.socket
 
 import org.apache.flink.streaming.api.scala._
 
+import scala.language.postfixOps
+
 /**
  * This example shows an implementation of WordCount with data from a text 
socket. 
  * To run the example make sure that the service providing the text data is 
already up and running.

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 1091aa3..714686e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.scala.windowing.{Delta, 
Time}
 
 import scala.Stream._
 import scala.math._
+import scala.language.postfixOps
 import scala.util.Random
 
 /**
@@ -78,13 +79,15 @@ object TopSpeedWindowing {
         numOfCars = args(0).toInt
         evictionSec = args(1).toInt
         triggerMeters = args(2).toDouble
+        true
       }
       else {
         System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> 
<triggerMeters>")
         false
       }
+    }else{
+      true
     }
-    true
   }
 
   var numOfCars = 2

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
index 0b78365..119862e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.scala._
 import scala.Stream._
 import scala.util.Random
 import java.util.concurrent.TimeUnit
+import scala.language.postfixOps
 
 object WindowJoin {
 

Reply via email to