Adapted test cases to actor model after rebasing.

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bd4ee47b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bd4ee47b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bd4ee47b

Branch: refs/heads/master
Commit: bd4ee47bb6b15c695e6c6946475c2d38099bad92
Parents: b8d0a0a
Author: Till Rohrmann <[email protected]>
Authored: Fri Nov 14 11:35:45 2014 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Dec 18 18:58:31 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/util/ClusterUtil.java       |   1 +
 .../util/AbstractRuntimeUDFContext.java         |   7 +-
 flink-runtime/pom.xml                           |   7 +
 .../runtime/execution/RuntimeEnvironment.java   |   2 +-
 .../librarycache/BlobLibraryCacheManager.java   |   3 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  33 ++-
 .../instance/InstanceConnectionInfo.java        |   2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   2 -
 .../jobmanager/web/JobmanagerInfoServlet.java   |   3 +-
 .../runtime/jobmanager/web/JsonFactory.java     |   3 +-
 .../taskmanager/TaskInputSplitProvider.java     |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  35 ++-
 .../runtime/messages/JobmanagerMessages.scala   |   4 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  27 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  34 +--
 .../flink/runtime/taskmanager/TaskManager.scala |  39 +--
 .../BlobLibraryCacheManagerTest.java            |   3 +-
 .../ExecutionGraphRestartTest.java              | 127 ---------
 .../executiongraph/ExecutionGraphTestUtils.java |  16 +-
 .../ExecutionVertexSchedulingTest.java          |  31 ++-
 .../TaskManagerLossFailsTasksTest.java          |  79 ------
 .../instance/InstanceConnectionInfoTest.java    |   4 +-
 .../runtime/jobmanager/RecoveryITCase.java      | 277 -------------------
 .../jobmanager/tasks/ReceiverBlockingOnce.java  |  52 ----
 .../jobmanager/tasks/ReceiverFailingOnce.java   |  50 ----
 .../flink/runtime/taskmanager/TaskTest.java     |   4 +-
 .../ExecutionGraphRestartTest.scala             | 128 +++++++++
 .../TaskManagerLossFailsTasksTest.scala         |  79 ++++++
 .../runtime/jobmanager/RecoveryITCase.scala     | 176 ++++++++++++
 .../apache/flink/runtime/jobmanager/Tasks.scala |  25 ++
 .../runtime/testingUtils/TestingCluster.scala   |   2 +-
 .../testingUtils/TestingTaskManager.scala       |   5 +-
 .../TestingTaskManagerMessages.scala            |   7 +-
 .../runtime/testingUtils/TestingUtils.scala     |   1 +
 flink-test-utils/pom.xml                        |   6 +
 .../flink/test/util/AbstractTestBase.java       |  44 ++-
 .../flink/test/util/JavaProgramTestBase.java    |   6 +-
 .../apache/flink/test/util/FailingTestBase.java |   6 +-
 38 files changed, 622 insertions(+), 710 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index a103dcb..bf5ba73 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.util;
 
 import java.net.InetSocketAddress;
 
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 57d1261..6b755e1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.functions.util;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.FutureTask;
@@ -98,7 +99,7 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
        }
 
        @Override
-       public <V, A> void addAccumulator(String name, Accumulator<V, A> 
accumulator) {
+       public <V, A extends Serializable> void addAccumulator(String name, 
Accumulator<V, A> accumulator) {
                if (accumulators.containsKey(name)) {
                        throw new UnsupportedOperationException("The counter '" 
+ name
                                        + "' already exists and cannot be 
added.");
@@ -108,7 +109,7 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
 
        @SuppressWarnings("unchecked")
        @Override
-       public <V, A> Accumulator<V, A> getAccumulator(String name) {
+       public <V, A extends Serializable> Accumulator<V, A> 
getAccumulator(String name) {
                return (Accumulator<V, A>) accumulators.get(name);
        }
 
@@ -130,7 +131,7 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
        // 
--------------------------------------------------------------------------------------------
        
        @SuppressWarnings("unchecked")
-       private <V, A> Accumulator<V, A> getAccumulator(String name,
+       private <V, A extends Serializable> Accumulator<V, A> 
getAccumulator(String name,
                        Class<? extends Accumulator<V, A>> accumulatorClass) {
 
                Accumulator<?, ?> accumulator = accumulators.get(name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 6922e7b..f04475c 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -331,6 +331,13 @@ under the License.
                                                </manifest>
                                        </archive>
                                </configuration>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
                        </plugin>
                </plugins>
        </build>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index efd4bbb..e91344f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -136,7 +136,7 @@ public class RuntimeEnvironment implements Environment, 
BufferProvider, LocalBuf
                                                        ClassLoader 
userCodeClassLoader,
                                                        MemoryManager 
memoryManager, IOManager ioManager,
                                                        InputSplitProvider 
inputSplitProvider,
-                                                       ActorRef accumulator)
+                                                       ActorRef accumulator,
                                                        
BroadcastVariableManager bcVarManager)
                throws Exception
        {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 0bb6fd3..c9f96ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -70,7 +70,8 @@ public final class BlobLibraryCacheManager extends TimerTask 
implements LibraryC
        private final BlobService blobService;
        
        // 
--------------------------------------------------------------------------------------------
-       
+
+       public BlobLibraryCacheManager(BlobService blobService, long 
cleanupInterval){
                this.blobService = blobService;
 
                // Initializing the clean up task

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 4e6a56b..915140c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -26,11 +26,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import akka.actor.ActorRef;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.Configuration;
@@ -54,6 +56,8 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged;
 import org.apache.flink.util.ExceptionUtils;
 
+import static akka.dispatch.Futures.future;
+
 
 public class ExecutionGraph {
 
@@ -434,18 +438,18 @@ public class ExecutionGraph {
                                                if (current == 
JobStatus.FAILING) {
                                                        if (numberOfRetriesLeft 
> 0 && transitionState(current, JobStatus.RESTARTING)) {
                                                                
numberOfRetriesLeft--;
-                                                               
-                                                               execute(new 
Runnable() {
+                                                               future(new 
Callable<Object>() {
                                                                        
@Override
-                                                                       public 
void run() {
-                                                                               
try {
+                                                                       public 
Object call() throws Exception {
+                                                                               
try{
                                                                                
        Thread.sleep(delayBeforeRetrying);
-                                                                               
} catch (InterruptedException e) {
+                                                                               
}catch(InterruptedException e){
                                                                                
        // should only happen on shutdown
                                                                                
}
                                                                                
restart();
+                                                                               
return null;
                                                                        }
-                                                               });
+                                                               }, 
AkkaUtils.globalExecutionContext());
                                                                break;
                                                        }
                                                        else if 
(numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, 
failureCause)) {
@@ -720,9 +724,7 @@ public class ExecutionGraph {
                        fail(error);
                }
        }
-               }
-       }
-       
+
        public void restart() {
                try {
                        if (state == JobStatus.FAILED) {
@@ -735,23 +737,24 @@ public class ExecutionGraph {
                                if (scheduler == null) {
                                        throw new IllegalStateException("The 
execution graph has not been schedudled before - scheduler is null.");
                                }
-                               
+
                                this.currentExecutions.clear();
                                this.edges.clear();
-                               
+
                                for (ExecutionJobVertex jv : 
this.verticesInCreationOrder) {
                                        jv.resetForNewExecution();
                                }
-                               
+
                                for (int i = 0; i < stateTimestamps.length; 
i++) {
                                        stateTimestamps[i] = 0;
                                }
                                nextVertexToFinish = 0;
                                transitionState(JobStatus.RESTARTING, 
JobStatus.CREATED);
                        }
-                       
+
                        scheduleForExecution(scheduler);
-               }
-               catch (Throwable t) {
+               } catch (Throwable t) {
                        fail(t);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index c574506..20a5601 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -194,7 +194,7 @@ public class InstanceConnectionInfo implements 
IOReadableWritable, Comparable<In
 
        @Override
        public String toString() {
-               return hostname() + " ( dataPort=" + dataPort + ")";
+               return getFQDNHostname() + " ( dataPort=" + dataPort + ")";
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 2db4ff4..3497824 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -331,8 +331,6 @@ public class JobGraph implements Serializable {
                }
        }
 
-               this.numExecutionRetries = in.readInt();
-               out.writeInt(numExecutionRetries);
        // 
--------------------------------------------------------------------------------------------
        //  Handling of attached JAR files
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index e1afa7a..b842a9b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -302,7 +302,8 @@ public class JobmanagerInfoServlet extends HttpServlet {
                                                                wrt.write(",");
                                                        }
                                                        wrt.write("{");
-                                                       wrt.write("\"node\": 
\"" + (slot == null ? "(none)" : 
slot.getInstance().getInstanceConnectionInfo().hostname()) + "\",");
+                                                       wrt.write("\"node\": 
\"" + (slot == null ? "(none)" : slot
+                                                                       
.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
                                                        wrt.write("\"message\": 
\"" + (failureCause == null ? "" : 
StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + 
"\"");
                                                        wrt.write("}");
                                                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 5fa19e2..8d48975 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -40,7 +40,8 @@ public class JsonFactory {
                json.append("\"vertexstatus\": \"" + vertex.getExecutionState() 
+ "\",");
                
                AllocatedSlot slot = vertex.getCurrentAssignedResource();
-               String instanceName = slot == null ? "(null)" : 
slot.getInstance().getInstanceConnectionInfo().hostname();
+               String instanceName = slot == null ? "(null)" : 
slot.getInstance()
+                               .getInstanceConnectionInfo().getFQDNHostname();
                
                json.append("\"vertexinstancename\": \"" + instanceName + "\"");
                json.append("}");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 669f94c..9853ded 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -39,7 +39,7 @@ public class TaskInputSplitProvider implements 
InputSplitProvider {
        private final FiniteDuration timeout;
        
        public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, 
JobVertexID vertexId,
-                                                                 
FiniteDuration timeout) {
+                                                               FiniteDuration 
timeout) {
                this.jobManager = jobManager;
                this.jobId = jobId;
                this.vertexId = vertexId;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1fa89c1..822a34c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit
 import akka.actor._
 import akka.pattern.Patterns
 import akka.pattern.{ask, pipe}
-import com.google.common.base.Preconditions
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, 
Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
@@ -48,7 +47,7 @@ import org.apache.flink.runtime.profiling.ProfilingUtils
 import org.slf4j.LoggerFactory
 
 import scala.collection.convert.WrapAsScala
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{Future}
 import scala.concurrent.duration._
 
 class JobManager(val configuration: Configuration) extends
@@ -61,7 +60,11 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
 
   log.info("Starting job manager.")
 
-  val (archiveCount, profiling, cleanupInterval) = 
JobManager.parseConfiguration(configuration)
+  val (archiveCount,
+    profiling,
+    cleanupInterval,
+    defaultExecutionRetries,
+    delayBetweenRetries) = JobManager.parseConfiguration(configuration)
 
   // Props for the profiler actor
   def profilerProps: Props = Props(classOf[JobManagerProfiler])
@@ -128,13 +131,22 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
           log.info(s"Received job ${jobGraph.getJobID} 
(${jobGraph.getName}}).")
 
           // Create the user code class loader
-          libraryCacheManager.register(jobGraph.getJobID, 
jobGraph.getUserJarBlobKeys)
+          libraryCacheManager.registerJob(jobGraph.getJobID, 
jobGraph.getUserJarBlobKeys)
 
           val (executionGraph, jobInfo) = 
currentJobs.getOrElseUpdate(jobGraph.getJobID(),
             (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
               jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys), 
JobInfo(sender(),
               System.currentTimeMillis())))
 
+          val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){
+            jobGraph.getNumberOfExecutionRetries
+          }else{
+            defaultExecutionRetries
+          }
+
+          executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
+          executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
+
           val userCodeLoader = 
libraryCacheManager.getClassLoader(jobGraph.getJobID)
 
           if (userCodeLoader == null) {
@@ -203,7 +215,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
                   s"Cleanup job ${jobGraph.getJobID}.")
               }
             case None =>
-              libraryCacheManager.unregister(jobGraph.getJobID)
+              libraryCacheManager.unregisterJob(jobGraph.getJobID)
               currentJobs.remove(jobGraph.getJobID)
 
           }
@@ -380,7 +392,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
     }
 
     try {
-      libraryCacheManager.unregister(jobID)
+      libraryCacheManager.unregisterJob(jobID)
     } catch {
       case t: Throwable =>
         log.error(t, s"Could not properly unregister job ${jobID} form the 
library cache.")
@@ -453,7 +465,7 @@ object JobManager {
     (actorSystem, startActor(configuration))
   }
 
-  def parseConfiguration(configuration: Configuration): (Int, Boolean, Long) = 
{
+  def parseConfiguration(configuration: Configuration): (Int, Boolean, Long, 
Int, Long) = {
     val archiveCount = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
     val profilingEnabled = 
configuration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)
@@ -462,7 +474,14 @@ object JobManager {
       .LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
-    (archiveCount, profilingEnabled, cleanupInterval)
+    val executionRetries = configuration.getInteger(ConfigConstants
+      .DEFAULT_EXECUTION_RETRIES_KEY, 
ConfigConstants.DEFAULT_EXECUTION_RETRIES);
+
+    val delayBetweenRetries = 2 * configuration.getLong(
+      ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT)
+
+    (archiveCount, profilingEnabled, cleanupInterval, executionRetries, 
delayBetweenRetries)
   }
 
   def startActor(configuration: Configuration)(implicit actorSystem: 
ActorSystem): ActorRef = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
index eeb3828..b1c5f1f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
@@ -200,14 +200,14 @@ object JobManagerMessages {
    * @param jobID
    * @param msg
    */
-  case class JobResultCanceled(jobID: JobID, msg: String)
+  case class JobResultCanceled(jobID: JobID, msg: String) extends JobResult
 
   /**
    * Denotes a failed job execution.
    * @param jobID
    * @param msg
    */
-  case class JobResultFailed(jobID: JobID, msg:String)
+  case class JobResultFailed(jobID: JobID, msg:String) extends JobResult
 
   sealed trait SubmissionResponse{
     def jobID: JobID

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 6d0da27..43be786 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -25,6 +25,7 @@ import akka.actor.{ActorRef, ActorSystem}
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.client.JobClient
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.slf4j.LoggerFactory
@@ -53,6 +54,8 @@ abstract class FlinkMiniCluster(userConfiguration: 
Configuration) {
 
   val (taskManagerActorSystems, taskManagerActors) = 
actorSystemsTaskManagers.unzip
 
+  val jobClientActorSystem = AkkaUtils.createActorSystem()
+
   waitForTaskManagersToBeRegistered()
 
   def generateConfiguration(userConfiguration: Configuration): Configuration
@@ -76,16 +79,34 @@ abstract class FlinkMiniCluster(userConfiguration: 
Configuration) {
       configuration)
   }
 
-  def getJobManager: ActorRef = {
-    jobManagerActor
+  def getJobClient(): ActorRef ={
+    val config = new Configuration()
+
+    config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
FlinkMiniCluster.HOSTNAME)
+    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
getJobManagerRPCPort)
+
+    JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
   }
 
+  def getJobClientActorSystem: ActorSystem = jobClientActorSystem
+
+  def getJobManagerRPCPort: Int = {
+    configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1)
+  }
 
+  def getJobManager: ActorRef = {
+    jobManagerActor
+  }
 
   def getTaskManagers = {
     taskManagerActors
   }
 
+  def getTaskManagersAsJava = {
+    import collection.JavaConverters._
+    taskManagerActors.asJava
+  }
+
   def stop(): Unit = {
     LOG.info("Stopping FlinkMiniCluster.")
     shutdown()
@@ -95,9 +116,11 @@ abstract class FlinkMiniCluster(userConfiguration: 
Configuration) {
   def shutdown(): Unit = {
     taskManagerActorSystems foreach { _.shutdown() }
     jobManagerActorSystem.shutdown()
+    jobClientActorSystem.shutdown()
   }
 
   def awaitTermination(): Unit = {
+    jobClientActorSystem.awaitTermination()
     taskManagerActorSystems foreach { _.awaitTermination()}
     jobManagerActorSystem.awaitTermination()
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index fb6e36b..edf16bb 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,22 +18,17 @@
 
 package org.apache.flink.runtime.minicluster
 
-import java.io.File
-
 import akka.actor.{ActorRef, ActorSystem}
-import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants, 
Configuration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.slf4j.LoggerFactory
-import scopt.OptionParser
 
 class LocalFlinkMiniCluster(userConfiguration: Configuration) extends
 FlinkMiniCluster(userConfiguration){
 
-  val actorSystem = AkkaUtils.createActorSystem()
-
   override def generateConfiguration(userConfiguration: Configuration): 
Configuration = {
     val forNumberString = System.getProperty("forkNumber")
 
@@ -87,33 +82,6 @@ FlinkMiniCluster(userConfiguration){
 
     TaskManager.startActorWithConfiguration(FlinkMiniCluster.HOSTNAME, config, 
false)(system)
   }
-
-  def getJobClient(): ActorRef ={
-    val config = new Configuration()
-
-    config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
FlinkMiniCluster.HOSTNAME)
-    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
getJobManagerRPCPort)
-
-    JobClient.startActorWithConfiguration(config)(actorSystem)
-  }
-
-  def getJobClientActorSystem: ActorSystem = actorSystem
-
-  override def shutdown(): Unit = {
-    super.shutdown()
-
-    actorSystem.shutdown()
-  }
-
-  override def awaitTermination(): Unit = {
-    actorSystem.awaitTermination()
-
-    super.awaitTermination()
-  }
-
-  def getJobManagerRPCPort: Int = {
-    configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1)
-  }
 }
 
 object LocalFlinkMiniCluster{

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 66d25c5..7004881 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -32,13 +32,14 @@ import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.BlobCache
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.execution.{ExecutionState, RuntimeEnvironment}
 import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager,
 FallbackLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.instance.{InstanceConnectionInfo, 
HardwareDescription, InstanceID}
-import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.disk.iomanager.{IOManagerAsync}
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager
 import org.apache.flink.runtime.io.network.{NetworkConnectionManager, 
LocalConnectionManager,
 ChannelManager}
@@ -81,8 +82,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, 
val jobManagerAkka
   val HEARTBEAT_INTERVAL = 1000 millisecond
 
   TaskManager.checkTempDirs(tmpDirPaths)
-  val ioManager = new IOManager(tmpDirPaths)
+  val ioManager = new IOManagerAsync(tmpDirPaths)
   val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, 
pageSize)
+  val bcVarManager = new BroadcastVariableManager();
   val hardwareDescription = 
HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
   val fileCache = new FileCache()
   val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, 
Task]()
@@ -232,7 +234,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
       var jarsRegistered = false
 
       try {
-        libraryCacheManager.register(jobID, tdd.getRequiredJarFiles());
+        libraryCacheManager.registerTask(jobID, executionID, 
tdd.getRequiredJarFiles());
         jarsRegistered = true
 
         val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
@@ -252,7 +254,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
 
         val splitProvider = new TaskInputSplitProvider(currentJobManager, 
jobID, vertexID, timeout)
         val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, 
memoryManager,
-          ioManager, splitProvider,currentJobManager)
+          ioManager, splitProvider, currentJobManager, bcVarManager)
 
         task.setEnvironment(env)
 
@@ -297,7 +299,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
 
           if (jarsRegistered) {
             try {
-              libraryCacheManager.unregister(jobID)
+              libraryCacheManager.unregisterTask(jobID, executionID)
             } catch {
               case ioe: IOException =>
                 log.debug(s"Unregistering the execution ${executionID} caused 
an IOException.")
@@ -350,7 +352,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
           task.unregisterMemoryManager(memoryManager)
 
           try {
-            libraryCacheManager.unregister(task.getJobID)
+            libraryCacheManager.unregisterTask(task.getJobID, executionID)
           } catch {
             case ioe: IOException =>
               log.error(ioe, s"Unregistering the execution ${executionID} 
caused an IOException.")
@@ -543,17 +545,6 @@ object TaskManager {
 
     val numberOfSlots = if (slots > 0) slots else 1
 
-    val configuredMemory: Long = configuration.getInteger(ConfigConstants
-      .TASK_MANAGER_MEMORY_SIZE_KEY, -1)
-
-    val memorySize = if (configuredMemory > 0) {
-      configuredMemory << 20
-    } else {
-      val fraction = 
configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
-      (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag * 
fraction).toLong
-    }
-
     val pageSize = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE)
 
@@ -585,6 +576,20 @@ object TaskManager {
         lowWaterMark, highWaterMark)
     }
 
+    val networkBufferMem = if(localExecution) 0 else numBuffers * bufferSize;
+
+    val configuredMemory: Long = configuration.getInteger(ConfigConstants
+      .TASK_MANAGER_MEMORY_SIZE_KEY, -1)
+
+    val memorySize = if (configuredMemory > 0) {
+      configuredMemory << 20
+    } else {
+      val fraction = 
configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+      ((EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag - 
networkBufferMem ) * fraction)
+        .toLong
+    }
+
 
     val memoryLoggingIntervalMs = configuration.getBoolean(ConfigConstants
       .TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 0cf3e02..71c0669 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -59,7 +59,8 @@ public class BlobLibraryCacheManagerTest {
                        buf[0] += 1;
                        keys.add(bc.put(buf));
 
-                       libraryCacheManager = new 
BlobLibraryCacheManager(server, GlobalConfiguration.getConfiguration());
+                       long cleanupInterval = 1000l;
+                       libraryCacheManager = new 
BlobLibraryCacheManager(server, cleanupInterval);
                        libraryCacheManager.registerJob(jid, keys);
 
                        List<File> files = new ArrayList<File>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
deleted file mode 100644
index f1855f2..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getSimpleAcknowledgingTaskmanager;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
-import org.apache.flink.runtime.protocols.TaskOperationProtocol;
-import org.junit.Test;
-
-public class ExecutionGraphRestartTest {
-       
-       @Test
-       public void testRestartManually() {
-               final int NUM_TASKS = 31;
-               
-               try {
-                       TaskOperationProtocol tm = 
getSimpleAcknowledgingTaskmanager();
-                       Instance instance = getInstance(tm);
-                       
-                       Scheduler scheduler = new Scheduler();
-                       scheduler.newInstanceAvailable(instance);
-                       
-                       // The job:
-                       
-                       final AbstractJobVertex sender = new 
AbstractJobVertex("Task");
-                       sender.setInvokableClass(NoOpInvokable.class);
-                       sender.setParallelism(NUM_TASKS);
-                       
-                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(new JobID(), 
"test job", new Configuration());
-                       eg.setNumberOfRetriesLeft(0);
-                       
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-                       
-                       assertEquals(JobStatus.CREATED, eg.getState());
-                       
-                       eg.scheduleForExecution(scheduler);
-                       assertEquals(JobStatus.RUNNING, eg.getState());
-                       
-                       eg.getAllExecutionVertices().iterator().next().fail(new 
Exception("Test Exception"));
-                       assertEquals(JobStatus.FAILED, eg.getState());
-                       
-                       eg.restart();
-                       assertEquals(JobStatus.RUNNING, eg.getState());
-                       
-                       for (ExecutionVertex v : eg.getAllExecutionVertices()) {
-                               v.executionFinished();
-                       }
-                       assertEquals(JobStatus.FINISHED, eg.getState());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testRestartSelf() {
-               final int NUM_TASKS = 31;
-               
-               try {
-                       TaskOperationProtocol tm = 
getSimpleAcknowledgingTaskmanager();
-                       Instance instance = getInstance(tm);
-                       
-                       Scheduler scheduler = new Scheduler();
-                       scheduler.newInstanceAvailable(instance);
-                       
-                       // The job:
-                       
-                       final AbstractJobVertex sender = new 
AbstractJobVertex("Task");
-                       sender.setInvokableClass(NoOpInvokable.class);
-                       sender.setParallelism(NUM_TASKS);
-                       
-                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(new JobID(), 
"test job", new Configuration());
-                       eg.setNumberOfRetriesLeft(1);
-                       
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-                       
-                       assertEquals(JobStatus.CREATED, eg.getState());
-                       
-                       eg.scheduleForExecution(scheduler);
-                       assertEquals(JobStatus.RUNNING, eg.getState());
-                       
-                       eg.getAllExecutionVertices().iterator().next().fail(new 
Exception("Test Exception"));
-                       
-                       // should have restarted itself
-                       assertEquals(JobStatus.RUNNING, eg.getState());
-                       
-                       for (ExecutionVertex v : eg.getAllExecutionVertices()) {
-                               v.executionFinished();
-                       }
-                       assertEquals(JobStatus.FINISHED, eg.getState());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index a9abd0c..fdeaafd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -94,17 +94,19 @@ public class ExecutionGraphTestUtils {
        // 
--------------------------------------------------------------------------------------------
        //  utility mocking methods
        // 
--------------------------------------------------------------------------------------------
-       
-       public static Instance getInstance(final ActorRef taskManager) throws 
Exception {
-               return getInstance(top, 1);
+
+       public static Instance getInstance(final ActorRef taskManager) throws
+                       Exception {
+               return getInstance(taskManager, 1);
        }
-       
-       public static Instance getInstance(final TaskOperationProtocol top, int 
numSlots) throws Exception {
-               HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
+
+       public static Instance getInstance(final ActorRef taskManager, final 
int numberOfSlots) throws
+                       Exception {
+                               HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
                InetAddress address = InetAddress.getByName("127.0.0.1");
                InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
                
-               return new Instance(taskManager, connection, new InstanceID(), 
hardwareDescription, 1);
+               return new Instance(taskManager, connection, new InstanceID(), 
hardwareDescription, numberOfSlots);
        }
 
        public static class SimpleAcknowledgingTaskManager extends UntypedActor 
{

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index e6e71e6..29a1b3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -24,6 +24,10 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
@@ -33,12 +37,26 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.mockito.Matchers;
 
 public class ExecutionVertexSchedulingTest {
+       private static ActorSystem system;
 
+       @BeforeClass
+       public static void setup(){
+               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
+       }
+
+       @AfterClass
+       public static void teardown(){
+               JavaTestKit.shutdownActorSystem(system);
+               system = null;
+       }
        
        @Test
        public void testSlotReleasedWhenScheduledImmediately() {
@@ -108,10 +126,13 @@ public class ExecutionVertexSchedulingTest {
        }
        
        @Test
-       public void testScheduleToDeploy() {
+       public void testScheduleToRunning() {
                try {
-                       // a slot than cannot be deployed to
-                       final Instance instance = 
getInstance(ActorRef.noSender());
+                       TestingUtils.setCallingThreadDispatcher(system);
+                       ActorRef tm = TestActorRef.create(system, 
Props.create(ExecutionGraphTestUtils
+                                       .SimpleAcknowledgingTaskManager.class));
+
+                       final Instance instance = getInstance(tm);
                        final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = getExecutionVertex(new 
JobVertexID());
@@ -124,11 +145,13 @@ public class ExecutionVertexSchedulingTest {
 
                        // try to deploy to the slot
                        vertex.scheduleForExecution(scheduler, false);
-                       assertEquals(ExecutionState.DEPLOYING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
+               }finally{
+                       TestingUtils.setGlobalExecutionContext();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java
deleted file mode 100644
index 37bdaa3..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getSimpleAcknowledgingTaskmanager;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
-import org.apache.flink.runtime.protocols.TaskOperationProtocol;
-import org.junit.Test;
-
-public class TaskManagerLossFailsTasksTest {
-
-       @Test
-       public void testTasksFailWhenTaskManagerLost() {
-               try {
-                       TaskOperationProtocol tm1 = 
getSimpleAcknowledgingTaskmanager();
-                       TaskOperationProtocol tm2 = 
getSimpleAcknowledgingTaskmanager();
-                       
-                       Instance instance1 = getInstance(tm1, 10);
-                       Instance instance2 = getInstance(tm2, 10);
-                       
-                       Scheduler scheduler = new Scheduler();
-                       scheduler.newInstanceAvailable(instance1);
-                       scheduler.newInstanceAvailable(instance2);
-                       
-                       // The job:
-                       
-                       final AbstractJobVertex sender = new 
AbstractJobVertex("Task");
-                       sender.setInvokableClass(NoOpInvokable.class);
-                       sender.setParallelism(20);
-                       
-                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(new JobID(), 
"test job", new Configuration());
-                       eg.setNumberOfRetriesLeft(0);
-                       
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-                       
-                       
-                       assertEquals(JobStatus.CREATED, eg.getState());
-                       
-                       eg.scheduleForExecution(scheduler);
-                       assertEquals(JobStatus.RUNNING, eg.getState());
-                       
-                       instance1.markDead();
-                       assertEquals(JobStatus.FAILING, eg.getState());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
index 3c371d6..c072e59 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
@@ -115,7 +115,7 @@ public class InstanceConnectionInfoTest {
        @Test
        public void testGetHostname0() {
                try {
-                       final InstanceConnectionInfo info1 = 
PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 
10523, 19871));
+                       final InstanceConnectionInfo info1 = 
PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 
19871));
                        Whitebox.setInternalState(info1, "fqdnHostName", 
"worker2.cluster.mycompany.com");
                        Assert.assertEquals("worker2", info1.getHostname());
                } catch (Exception e) {
@@ -127,7 +127,7 @@ public class InstanceConnectionInfoTest {
        @Test
        public void testGetHostname1() {
                try {
-                       final InstanceConnectionInfo info1 = 
PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 
10523, 19871));
+                       final InstanceConnectionInfo info1 = 
PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 
19871));
                        Whitebox.setInternalState(info1, "fqdnHostName", 
"worker10");
                        Assert.assertEquals("worker10", info1.getHostname());
                } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
deleted file mode 100644
index be3e765..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import static 
org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
-import static 
org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.AbstractJobResult;
-import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.instance.LocalInstanceManager;
-import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.jobmanager.tasks.ReceiverBlockingOnce;
-import org.apache.flink.runtime.jobmanager.tasks.ReceiverFailingOnce;
-import org.apache.flink.runtime.jobmanager.tasks.Sender;
-import org.junit.Test;
-
-/**
- * This test is intended to cover the basic functionality of the {@link 
JobManager}.
- */
-public class RecoveryITCase {
-       
-       @Test
-       public void testForwardJob() {
-               
-               ReceiverFailingOnce.resetFailedBefore();
-               
-               final int NUM_TASKS = 31;
-               
-               JobManager jm = null;
-               
-               try {
-                       final AbstractJobVertex sender = new 
AbstractJobVertex("Sender");
-                       final AbstractJobVertex receiver = new 
AbstractJobVertex("Receiver");
-                       
-                       sender.setInvokableClass(Sender.class);
-                       receiver.setInvokableClass(ReceiverFailingOnce.class);
-                       
-                       sender.setParallelism(NUM_TASKS);
-                       receiver.setParallelism(NUM_TASKS);
-                       
-                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE);
-                       
-                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
-                       jobGraph.setNumberOfExecutionRetries(1);
-                       
-                       jm = startJobManager(2 * NUM_TASKS);
-                       
-                       final GlobalBufferPool bp = ((LocalInstanceManager) 
jm.getInstanceManager())
-                                       
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
-                       
-                       JobSubmissionResult result = jm.submitJob(jobGraph);
-
-                       if (result.getReturnCode() != 
AbstractJobResult.ReturnCode.SUCCESS) {
-                               System.out.println(result.getDescription());
-                       }
-                       assertEquals(AbstractJobResult.ReturnCode.SUCCESS, 
result.getReturnCode());
-                       
-                       // monitor the execution
-                       ExecutionGraph eg = 
jm.getCurrentJobs().get(jobGraph.getJobID());
-                       
-                       if (eg != null) {
-                               eg.waitForJobEnd();
-                               
-                               if (eg.getState() != JobStatus.FINISHED) {
-                                       Throwable t = eg.getFailureCause();
-                                       String message = null;
-                                       
-                                       if (t != null) {
-                                               t.printStackTrace();
-                                               message = t.getMessage();
-                                       }
-                                       fail("Execution failed despite 
recovery: " + message);
-                               }
-                       }
-                       else {
-                               // already done, that was fast;
-                       }
-                       
-                       // make sure that in any case, the network buffers are 
all returned
-                       waitForTaskThreadsToBeTerminated();
-                       assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       if (jm != null) {
-                               jm.shutdown();
-                       }
-               }
-       }
-       
-       @Test
-       public void testForwardJobWithSlotSharing() {
-               
-               ReceiverFailingOnce.resetFailedBefore();
-               
-               final int NUM_TASKS = 31;
-               
-               JobManager jm = null;
-               
-               try {
-                       final AbstractJobVertex sender = new 
AbstractJobVertex("Sender");
-                       final AbstractJobVertex receiver = new 
AbstractJobVertex("Receiver");
-                       
-                       sender.setInvokableClass(Sender.class);
-                       receiver.setInvokableClass(ReceiverFailingOnce.class);
-                       
-                       sender.setParallelism(NUM_TASKS);
-                       receiver.setParallelism(NUM_TASKS);
-                       
-                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE);
-                       
-                       SlotSharingGroup sharingGroup = new SlotSharingGroup();
-                       sender.setSlotSharingGroup(sharingGroup);
-                       receiver.setSlotSharingGroup(sharingGroup);
-                       
-                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
-                       jobGraph.setNumberOfExecutionRetries(1);
-                       
-                       jm = startJobManager(NUM_TASKS);
-                       
-                       final GlobalBufferPool bp = ((LocalInstanceManager) 
jm.getInstanceManager())
-                                       
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
-                       
-                       JobSubmissionResult result = jm.submitJob(jobGraph);
-
-                       if (result.getReturnCode() != 
AbstractJobResult.ReturnCode.SUCCESS) {
-                               System.out.println(result.getDescription());
-                       }
-                       assertEquals(AbstractJobResult.ReturnCode.SUCCESS, 
result.getReturnCode());
-                       
-                       // monitor the execution
-                       ExecutionGraph eg = 
jm.getCurrentJobs().get(jobGraph.getJobID());
-                       
-                       if (eg != null) {
-                               eg.waitForJobEnd();
-                               
-                               if (eg.getState() != JobStatus.FINISHED) {
-                                       Throwable t = eg.getFailureCause();
-                                       String message = null;
-                                       
-                                       if (t != null) {
-                                               t.printStackTrace();
-                                               message = t.getMessage();
-                                       }
-                                       fail("Execution failed despite 
recovery: " + message);
-                               }
-                       }
-                       else {
-                               // already done, that was fast;
-                       }
-                       
-                       // make sure that in any case, the network buffers are 
all returned
-                       waitForTaskThreadsToBeTerminated();
-                       assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       if (jm != null) {
-                               jm.shutdown();
-                       }
-               }
-       }
-       
-       @Test
-       public void testRecoverTaskManagerFailure() {
-               
-               final int NUM_TASKS = 31;
-               
-               JobManager jm = null;
-               
-               try {
-                       final AbstractJobVertex sender = new 
AbstractJobVertex("Sender");
-                       final AbstractJobVertex receiver = new 
AbstractJobVertex("Receiver");
-                       
-                       sender.setInvokableClass(Sender.class);
-                       receiver.setInvokableClass(ReceiverBlockingOnce.class);
-                       sender.setParallelism(NUM_TASKS);
-                       receiver.setParallelism(NUM_TASKS);
-                       
-                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE);
-                       
-                       SlotSharingGroup sharingGroup = new SlotSharingGroup();
-                       sender.setSlotSharingGroup(sharingGroup);
-                       receiver.setSlotSharingGroup(sharingGroup);
-                       
-                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
-                       jobGraph.setNumberOfExecutionRetries(1);
-                       
-                       // make sure we have fast heartbeats and failure 
detection
-                       Configuration cfg = new Configuration();
-                       
cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 3000);
-                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 1000);
-                       
-                       jm = startJobManager(2, NUM_TASKS, cfg);
-                       
-                       JobSubmissionResult result = jm.submitJob(jobGraph);
-
-                       if (result.getReturnCode() != 
AbstractJobResult.ReturnCode.SUCCESS) {
-                               System.out.println(result.getDescription());
-                       }
-                       assertEquals(AbstractJobResult.ReturnCode.SUCCESS, 
result.getReturnCode());
-                       
-                       // monitor the execution
-                       ExecutionGraph eg = 
jm.getCurrentJobs().get(jobGraph.getJobID());
-                       
-                       // wait for a bit until all is running, make sure the 
second attempt does not block
-                       Thread.sleep(300);
-                       ReceiverBlockingOnce.setShouldNotBlock();
-                       
-                       // shutdown one of the taskmanagers
-                       ((LocalInstanceManager) 
jm.getInstanceManager()).getTaskManagers()[0].shutdown();
-                       
-                       // wait for the recovery to do its work
-                       if (eg != null) {
-                               eg.waitForJobEnd();
-                               
-                               if (eg.getState() != JobStatus.FINISHED) {
-                                       Throwable t = eg.getFailureCause();
-                                       String message = null;
-                                       
-                                       if (t != null) {
-                                               t.printStackTrace();
-                                               message = t.getMessage();
-                                       }
-                                       fail("Execution failed despite 
recovery: " + message);
-                               }
-                       }
-                       else {
-                               // already done, that was fast;
-                       }
-                       
-                       // make sure that in any case, the network buffers are 
all returned
-                       waitForTaskThreadsToBeTerminated();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       if (jm != null) {
-                               jm.shutdown();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
deleted file mode 100644
index 3425842..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.tasks;
-
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.types.IntegerRecord;
-
-public final class ReceiverBlockingOnce extends AbstractInvokable {
-       
-       private static boolean shouldBlock = true;
-
-       private RecordReader<IntegerRecord> reader;
-       
-       @Override
-       public void registerInputOutput() {
-               reader = new RecordReader<IntegerRecord>(this, 
IntegerRecord.class);
-       }
-
-       @Override
-       public void invoke() throws Exception {
-               if (shouldBlock) {
-                       
-                       Object o = new Object();
-                       synchronized (o) {
-                               o.wait();
-                       }
-               }
-               
-               while (reader.next() != null);
-       }
-       
-       public static void setShouldNotBlock() {
-               shouldBlock = false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
deleted file mode 100644
index 3fad6b1..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.tasks;
-
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.types.IntegerRecord;
-
-public final class ReceiverFailingOnce extends AbstractInvokable {
-       
-       private static boolean hasFailedBefore = false;
-
-       private RecordReader<IntegerRecord> reader;
-       
-       @Override
-       public void registerInputOutput() {
-               reader = new RecordReader<IntegerRecord>(this, 
IntegerRecord.class);
-       }
-
-       @Override
-       public void invoke() throws Exception {
-               if (!hasFailedBefore && 
getEnvironment().getIndexInSubtaskGroup() == 0) {
-                       hasFailedBefore = true;
-                       throw new Exception("Test exception");
-               }
-               
-               while (reader.next() != null);
-       }
-       
-       
-       public static void resetFailedBefore() {
-               hasFailedBefore = false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index e00439b..4b0a6ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -265,7 +265,7 @@ public class TaskTest {
                        
                        RuntimeEnvironment env = new RuntimeEnvironment(task, 
tdd, getClass().getClassLoader(),
                                        mock(MemoryManager.class), 
mock(IOManager.class), mock(InputSplitProvider.class),
-                                       mock(ActorRef.class));
+                                       mock(ActorRef.class), new 
BroadcastVariableManager());
                        
                        task.setEnvironment(env);
                        
@@ -303,7 +303,7 @@ public class TaskTest {
                        
                        RuntimeEnvironment env = new RuntimeEnvironment(task, 
tdd, getClass().getClassLoader(),
                                        mock(MemoryManager.class), 
mock(IOManager.class), mock(InputSplitProvider.class),
-                                       mock(ActorRef.class));
+                                       mock(ActorRef.class), new 
BroadcastVariableManager());
                        
                        task.setEnvironment(env);
                        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
new file mode 100644
index 0000000..f4ce7b0
--- /dev/null
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph
+
+import akka.actor.{Props, ActorSystem}
+import akka.testkit.{TestKit}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, 
AbstractJobVertex}
+import org.apache.flink.runtime.jobmanager.Tasks
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+class ExecutionGraphRestartTest(_system: ActorSystem) extends TestKit(_system) 
with WordSpecLike
+with Matchers with BeforeAndAfterAll {
+
+  def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  val NUM_TASKS = 31
+
+  "The execution graph" must {
+    "be manually restartable" in {
+      try {
+        val tm = system.actorOf(Props(classOf[ExecutionGraphTestUtils
+        .SimpleAcknowledgingTaskManager], "TaskManager"))
+        val instance = ExecutionGraphTestUtils.getInstance(tm)
+
+        val scheduler = new Scheduler
+        scheduler.newInstanceAvailable(instance)
+
+        val sender = new AbstractJobVertex("Task")
+        sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
+        sender.setParallelism(NUM_TASKS)
+
+        val jobGraph = new JobGraph("Pointwise job", sender)
+
+        val eg = new ExecutionGraph(new JobID(), "test job", new 
Configuration())
+        eg.setNumberOfRetriesLeft(0)
+        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
+
+        eg.getState should equal(JobStatus.CREATED)
+
+        eg.scheduleForExecution(scheduler)
+        eg.getState should equal(JobStatus.RUNNING)
+
+        eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test 
Exception"))
+        eg.getState should equal(JobStatus.FAILED)
+
+        eg.restart()
+        eg.getState should equal(JobStatus.RUNNING)
+
+        import collection.JavaConverters._
+        for (vertex <- eg.getAllExecutionVertices.asScala) {
+          vertex.executionFinished()
+        }
+
+        eg.getState should equal(JobStatus.FINISHED)
+      } catch {
+        case t: Throwable =>
+          t.printStackTrace()
+          fail(t.getMessage)
+      }
+    }
+
+    "restart itself automatically" in {
+      try {
+        val tm = system.actorOf(Props
+          (classOf[ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager], 
"TaskManager"))
+        val instance = ExecutionGraphTestUtils.getInstance(tm)
+
+        val scheduler = new Scheduler
+        scheduler.newInstanceAvailable(instance)
+
+        val sender = new AbstractJobVertex("Task")
+        sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
+        sender.setParallelism(NUM_TASKS)
+
+        val jobGraph = new JobGraph("Pointwise job", sender)
+
+        val eg = new ExecutionGraph(new JobID(), "Test job", new 
Configuration())
+        eg.setNumberOfRetriesLeft(1)
+        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
+
+        eg.getState should equal(JobStatus.CREATED)
+
+        eg.scheduleForExecution(scheduler)
+        eg.getState should equal(JobStatus.RUNNING)
+
+        eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test 
Exception"))
+
+        eg.getState should equal(JobStatus.RUNNING)
+
+        import collection.JavaConverters._
+        for (vertex <- eg.getAllExecutionVertices.asScala) {
+          vertex.executionFinished()
+        }
+
+        eg.getState should equal(JobStatus.FINISHED)
+      }catch{
+        case t: Throwable =>
+          t.printStackTrace()
+          fail(t.getMessage)
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
new file mode 100644
index 0000000..9884bca
--- /dev/null
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph
+
+import akka.actor.{Props, ActorSystem}
+import akka.testkit.TestKit
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionGraphTestUtils}
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
+.SimpleAcknowledgingTaskManager
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, 
AbstractJobVertex}
+import org.apache.flink.runtime.jobmanager.Tasks
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+class TaskManagerLossFailsTasksTest(_system: ActorSystem) extends 
TestKit(_system) with
+WordSpecLike with Matchers with BeforeAndAfterAll {
+
+  def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "A task manager loss" must {
+    "fail the assigned tasks" in {
+      try {
+        val tm1 = 
system.actorOf(Props(classOf[SimpleAcknowledgingTaskManager], "TaskManager1"))
+        val tm2 = 
system.actorOf(Props(classOf[SimpleAcknowledgingTaskManager], "TaskManager2"))
+
+        val instance1 = ExecutionGraphTestUtils.getInstance(tm1, 10)
+        val instance2 = ExecutionGraphTestUtils.getInstance(tm2, 10)
+
+        val scheduler = new Scheduler
+        scheduler.newInstanceAvailable(instance1)
+        scheduler.newInstanceAvailable(instance2)
+
+        val sender = new AbstractJobVertex("Task")
+        sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
+        sender.setParallelism(20)
+
+        val jobGraph = new JobGraph("Pointwise job", sender)
+
+        val eg = new ExecutionGraph(new JobID(), "test job", new 
Configuration())
+        eg.setNumberOfRetriesLeft(0)
+        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
+
+        eg.getState should equal(JobStatus.CREATED)
+
+        eg.scheduleForExecution(scheduler)
+        eg.getState should equal(JobStatus.RUNNING)
+
+        instance1.markDead()
+        eg.getState should equal(JobStatus.FAILING)
+      }catch{
+        case t:Throwable =>
+          t.printStackTrace()
+          fail(t.getMessage)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
new file mode 100644
index 0000000..cc96f4b
--- /dev/null
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import akka.actor.{PoisonPill, ActorSystem}
+import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, 
AbstractJobVertex}
+import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, 
FailingOnceReceiver}
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, 
SubmissionSuccess,
+SubmitJob}
+import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.junit.runner.RunWith
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class RecoveryITCase(_system: ActorSystem) extends TestKit(_system) with 
ImplicitSender with
+WordSpecLike with Matchers with BeforeAndAfterAll {
+  def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
+
+  override def afterAll: Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  val NUM_TASKS = 31
+
+  "The recovery" must {
+    "recover once failing forward job" in {
+      FailingOnceReceiver.failed = false
+
+      val sender = new AbstractJobVertex("Sender");
+      val receiver = new AbstractJobVertex("Receiver");
+
+      sender.setInvokableClass(classOf[Tasks.Sender])
+      receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver])
+
+      sender.setParallelism(NUM_TASKS)
+      receiver.setParallelism(NUM_TASKS)
+
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      jobGraph.setNumberOfExecutionRetries(1)
+
+      val cluster = TestingUtils.startTestingCluster(2 * NUM_TASKS)
+      val jm = cluster.getJobManager
+
+      try {
+        within(TestingUtils.TESTING_DURATION){
+          jm ! SubmitJob(jobGraph)
+
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+
+          val result = expectMsgType[JobResultSuccess]
+
+          result.jobID should equal(jobGraph.getJobID)
+        }
+      } catch {
+        case t: Throwable =>
+          t.printStackTrace()
+          fail(t.getMessage)
+      } finally{
+        cluster.stop()
+      }
+    }
+
+    "recover once failing forward job with slot sharing" in {
+      FailingOnceReceiver.failed = false
+
+      val sender = new AbstractJobVertex("Sender");
+      val receiver = new AbstractJobVertex("Receiver");
+
+      sender.setInvokableClass(classOf[Tasks.Sender])
+      receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver])
+
+      sender.setParallelism(NUM_TASKS)
+      receiver.setParallelism(NUM_TASKS)
+
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+
+      val sharingGroup = new SlotSharingGroup
+      sender.setSlotSharingGroup(sharingGroup)
+      receiver.setSlotSharingGroup(sharingGroup)
+
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      jobGraph.setNumberOfExecutionRetries(1)
+
+      val cluster = TestingUtils.startTestingCluster(NUM_TASKS)
+      val jm = cluster.getJobManager
+
+      try {
+        within(TestingUtils.TESTING_DURATION){
+          jm ! SubmitJob(jobGraph)
+
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+
+          val result = expectMsgType[JobResultSuccess]
+
+          result.jobID should equal(jobGraph.getJobID)
+        }
+      } catch {
+        case t: Throwable =>
+          t.printStackTrace()
+          fail(t.getMessage)
+      } finally{
+        cluster.stop()
+      }
+    }
+
+    "recover a task manager failure" in {
+      BlockingOnceReceiver.blocking = true
+
+      val sender = new AbstractJobVertex("Sender");
+      val receiver = new AbstractJobVertex("Receiver");
+
+      sender.setInvokableClass(classOf[Tasks.Sender])
+      receiver.setInvokableClass(classOf[Tasks.BlockingOnceReceiver])
+
+      sender.setParallelism(NUM_TASKS)
+      receiver.setParallelism(NUM_TASKS)
+
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+
+      val sharingGroup = new SlotSharingGroup
+      sender.setSlotSharingGroup(sharingGroup)
+      receiver.setSlotSharingGroup(sharingGroup)
+
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      jobGraph.setNumberOfExecutionRetries(1)
+
+      val cluster = TestingUtils.startTestingCluster(NUM_TASKS, 2)
+
+      val jm = cluster.getJobManager
+      val taskManagers = cluster.getTaskManagers
+
+      try {
+        within(TestingUtils.TESTING_DURATION){
+          jm ! SubmitJob(jobGraph)
+
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+
+          Thread.sleep(300)
+          BlockingOnceReceiver.blocking = false
+          taskManagers(0) ! PoisonPill
+
+          val result = expectMsgType[JobResultSuccess]
+
+          result.jobID should equal(jobGraph.getJobID)
+        }
+      } catch {
+        case t: Throwable =>
+          t.printStackTrace()
+          fail(t.getMessage)
+      } finally{
+        cluster.stop()
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index bee9578..3306374 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -76,6 +76,31 @@ object Tasks {
     }
   }
 
+  class FailingOnceReceiver extends Receiver {
+    import FailingOnceReceiver.failed
+
+    override def invoke(): Unit = {
+      if(!failed && getEnvironment.getIndexInSubtaskGroup == 0){
+        failed = true
+        throw new Exception("Test exception.")
+      }else{
+        super.invoke()
+      }
+    }
+  }
+
+  object FailingOnceReceiver{
+    var failed = false
+  }
+
+  class BlockingOnceReceiver extends Receiver {
+
+  }
+
+  object BlockingOnceReceiver{
+    var blocking = true
+  }
+
   class AgnosticReceiver extends AbstractInvokable {
     var reader: RecordReader[IntegerRecord] = _
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 480bc1b..9961ada 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -47,6 +47,6 @@ class TestingCluster(userConfiguration: Configuration) 
extends FlinkMiniCluster(
       TaskManager.parseConfiguration(FlinkMiniCluster.HOSTNAME, configuration, 
true)
 
     system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, 
taskManagerConfig,
-      networkConnectionConfig)), TaskManager.TASK_MANAGER_NAME + index)
+      networkConnectionConfig) with TestingTaskManager), 
TaskManager.TASK_MANAGER_NAME + index)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 768a5b0..5c6cca1 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -22,7 +22,7 @@ import akka.actor.ActorRef
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.{ActorLogMessages}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{NotifyWhenTaskRemoved,
 ResponseRunningTasks, RequestRunningTasks}
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
 
 trait TestingTaskManager extends ActorLogMessages {
@@ -50,5 +50,8 @@ trait TestingTaskManager extends ActorLogMessages {
         case Some(actors) => for(actor <- actors) actor ! true
         case None =>
       }
+    case RequestBroadcastVariablesWithReferences => {
+      sender() ! 
ResponseBroadcastVariablesWithReferences(bcVarManager.getNumberOfVariablesWithReferences)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index 674f0d7..24d7e5c 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -21,12 +21,13 @@ package org.apache.flink.runtime.testingUtils
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.taskmanager.Task
 
-import scala.collection.convert.DecorateAsJava
-
-object TestingTaskManagerMessages extends DecorateAsJava{
+object TestingTaskManagerMessages{
   case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
   case object RequestRunningTasks
   case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
+    import collection.JavaConverters._
     def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
   }
+  case object RequestBroadcastVariablesWithReferences
+  case class ResponseBroadcastVariablesWithReferences(number: Int)
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 126ae10..c1565c7 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -71,6 +71,7 @@ object TestingUtils {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskManagers)
+    
config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 
1000)
     val cluster = new TestingCluster(config)
     cluster
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml
index b3cbae6..d77318e 100644
--- a/flink-test-utils/pom.xml
+++ b/flink-test-utils/pom.xml
@@ -52,6 +52,12 @@ under the License.
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients</artifactId>
                        <version>${project.version}</version>
                </dependency>

Reply via email to