Reworked local cluster start. TaskManager watches JobManager and tries 
reregistration in case of disconnect. Introduced akka.ask.timeout config 
parameter to configure akka timeouts.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b8d0a0aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b8d0a0aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b8d0a0aa

Branch: refs/heads/master
Commit: b8d0a0aadaed012ccb93176dcc21acbdbd005eea
Parents: 26c7794
Author: Till Rohrmann <[email protected]>
Authored: Wed Nov 12 19:48:36 2014 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Dec 18 18:58:31 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 16 +++++--
 .../org/apache/flink/client/program/Client.java |  9 +++-
 .../flink/client/web/JobsInfoServlet.java       | 11 ++++-
 .../flink/client/web/WebInterfaceServer.java    |  5 ++-
 .../apache/flink/client/program/ClientTest.java |  6 ++-
 .../flink/configuration/ConfigConstants.java    |  7 +++
 flink-dist/src/main/flink-bin/bin/jobmanager.sh |  2 +-
 .../src/main/flink-bin/bin/start-local.bat      |  2 +-
 .../flink/runtime/executiongraph/Execution.java | 11 ++++-
 .../runtime/executiongraph/ExecutionGraph.java  |  1 -
 .../runtime/io/network/ChannelManager.java      | 10 ++++-
 .../jobmanager/web/JobmanagerInfoServlet.java   | 26 ++++++-----
 .../jobmanager/web/SetupInfoServlet.java        | 11 +++--
 .../runtime/jobmanager/web/WebInfoServer.java   | 15 +++++--
 .../taskmanager/TaskInputSplitProvider.java     |  9 +++-
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 37 ++++++---------
 .../apache/flink/runtime/client/JobClient.scala | 18 ++++----
 .../flink/runtime/jobmanager/JobManager.scala   | 47 +++++++++++++++-----
 .../jobmanager/JobManagerCLIConfiguration.scala |  8 +++-
 .../runtime/jobmanager/WithWebServer.scala      |  2 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  | 15 ++++---
 .../minicluster/LocalFlinkMiniCluster.scala     | 36 ---------------
 .../flink/runtime/taskmanager/TaskManager.scala | 30 +++++++++----
 .../taskmanager/TaskManagerConfiguration.scala  |  5 ++-
 .../runtime/taskmanager/TaskManagerTest.java    | 17 ++++---
 .../runtime/jobmanager/JobManagerITCase.scala   |  2 +
 .../test/cancelling/CancellingTestBase.java     |  7 ++-
 27 files changed, 221 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 85ab07a..a4d1ac6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -33,6 +33,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -63,6 +64,7 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
 import org.apache.flink.util.StringUtils;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Implementation of a simple command line fronted for executing programs.
@@ -511,7 +513,7 @@ public class CliFrontend {
                        }
 
                        Iterable<ExecutionGraph> jobs = 
AkkaUtils.<RunningJobs>ask(jobManager,
-                                       
RequestRunningJobs$.MODULE$).asJavaIterable();
+                                       RequestRunningJobs$.MODULE$, 
getAkkaTimeout()).asJavaIterable();
 
                        ArrayList<ExecutionGraph> runningJobs = null;
                        ArrayList<ExecutionGraph> scheduledJobs = null;
@@ -632,7 +634,7 @@ public class CliFrontend {
                                return 1;
                        }
 
-                       AkkaUtils.ask(jobManager, new CancelJob(jobId));
+                       AkkaUtils.ask(jobManager, new CancelJob(jobId), 
getAkkaTimeout());
                        return 0;
                }
                catch (Throwable t) {
@@ -756,7 +758,8 @@ public class CliFrontend {
                }
 
                return JobManager.getJobManager(jobManagerAddress,
-                               ActorSystem.create("CliFrontendActorSystem", 
AkkaUtils.getDefaultActorSystemConfig()));
+                               ActorSystem.create("CliFrontendActorSystem", 
AkkaUtils
+                                               
.getDefaultActorSystemConfig()),getAkkaTimeout());
        }
        
        
@@ -815,6 +818,13 @@ public class CliFrontend {
                }
                return GlobalConfiguration.getConfiguration();
        }
+
+       protected FiniteDuration getAkkaTimeout(){
+               Configuration config = getGlobalConfiguration();
+
+               return new 
FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+                               ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), 
TimeUnit.SECONDS);
+       }
        
        public static List<Tuple2<String, String>> getDynamicProperties(String 
dynamicPropertiesEncoded) {
                List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, 
String>>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index d2c9983..203f294 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -53,6 +54,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Encapsulates the functionality necessary to submit a program to a remote 
cluster.
@@ -301,12 +303,15 @@ public class Client {
                String hostname = configuration.getString(ConfigConstants
                                .JOB_MANAGER_IPC_ADDRESS_KEY, null);
 
+               FiniteDuration timeout = new 
FiniteDuration(configuration.getInteger(ConfigConstants
+                               .AKKA_ASK_TIMEOUT, 
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
+
                if(hostname == null){
                        throw new ProgramInvocationException("Could not find 
hostname of job manager.");
                }
 
                try {
-                       JobClient.uploadJarFiles(jobGraph, hostname, client);
+                       JobClient.uploadJarFiles(jobGraph, hostname, client, 
timeout);
                }catch(IOException e){
                        throw new ProgramInvocationException("Could not upload 
blobs.", e);
                }
@@ -317,7 +322,7 @@ public class Client {
                                return JobClient.submitJobAndWait(jobGraph, 
printStatusDuringExecution, client);
                        }
                        else {
-                               SubmissionResponse response 
=JobClient.submitJobDetached(jobGraph, client);
+                               SubmissionResponse response 
=JobClient.submitJobDetached(jobGraph, client, timeout);
 
                                if(response instanceof SubmissionFailure){
                                        SubmissionFailure failure = 
(SubmissionFailure) response;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java 
b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
index ba7f112..e339ec7 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
+import scala.concurrent.duration.FiniteDuration;
 
 
 public class JobsInfoServlet extends HttpServlet {
@@ -51,11 +53,15 @@ public class JobsInfoServlet extends HttpServlet {
        private final Configuration config;
 
        private final ActorSystem system;
+
+       private final FiniteDuration timeout;
        
        public JobsInfoServlet(Configuration flinkConfig) {
                this.config = flinkConfig;
                system = ActorSystem.create("JobsInfoServletActorSystem",
                                AkkaUtils.getDefaultActorSystemConfig());
+               this.timeout = new 
FiniteDuration(flinkConfig.getInteger(ConfigConstants
+                               .AKKA_ASK_TIMEOUT, 
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
        }
 
        @Override
@@ -67,10 +73,11 @@ public class JobsInfoServlet extends HttpServlet {
                        int jmPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
                                        
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
 
-                       ActorRef jm = JobManager.getJobManager(new 
InetSocketAddress(jmHost, jmPort), system);
+                       ActorRef jm = JobManager.getJobManager(new 
InetSocketAddress(jmHost, jmPort), system,
+                                       timeout);
 
                        Iterator<ExecutionGraph> graphs = 
AkkaUtils.<RunningJobs>ask(jm,
-                                       
RequestRunningJobs$.MODULE$).asJavaIterable().iterator();
+                                       RequestRunningJobs$.MODULE$, 
timeout).asJavaIterable().iterator();
 
 
                        resp.setStatus(HttpServletResponse.SC_OK);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
 
b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
index 1de21ff..ad7b6d4 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
@@ -95,7 +95,7 @@ public class WebInterfaceServer {
                        throw new FileNotFoundException("Cannot start web 
interface server because the web " +
                                        "root dir " + WEB_ROOT_DIR + " is not 
included in the jar.");
                }
-               
+
                String tmpDirPath = 
config.getString(ConfigConstants.WEB_TMP_DIR_KEY,
                        ConfigConstants.DEFAULT_WEB_TMP_DIR);
                
@@ -155,7 +155,8 @@ public class WebInterfaceServer {
                ServletContextHandler servletContext = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
                servletContext.setContextPath("/");
                servletContext.addServlet(new ServletHolder(new 
PactJobJSONServlet(uploadDir)), "/pactPlan");
-               servletContext.addServlet(new ServletHolder(new 
JobsInfoServlet(nepheleConfig)), "/jobsInfo");
+               servletContext.addServlet(new ServletHolder(new 
JobsInfoServlet(nepheleConfig)),
+                               "/jobsInfo");
                servletContext.addServlet(new ServletHolder(new 
PlanDisplayServlet(jobManagerWebPort)), "/showPlan");
                servletContext.addServlet(new ServletHolder(new 
JobsServlet(uploadDir, tmpDir, "launch.html")), "/jobs");
                servletContext.addServlet(new ServletHolder(new 
JobSubmissionServlet(nepheleConfig, uploadDir, planDumpDir)),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 7f81ccd..aa7f4aa 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -43,8 +43,10 @@ import 
org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -105,7 +107,7 @@ public class ClientTest {
        @Test
        public void shouldSubmitToJobClient() throws 
ProgramInvocationException, IOException {
                when(mockJobClient.submitJobDetached(any(JobGraph.class),
-                               
any(ActorRef.class))).thenReturn(mockSubmissionSuccess);
+                               any(ActorRef.class), 
any(FiniteDuration.class))).thenReturn(mockSubmissionSuccess);
 
                Client out = new Client(configMock, 
getClass().getClassLoader());
                out.run(program.getPlanWithJars(), -1, false);
@@ -118,7 +120,7 @@ public class ClientTest {
        @Test(expected = ProgramInvocationException.class)
        public void shouldThrowException() throws Exception {
                when(mockJobClient.submitJobDetached(any(JobGraph.class),
-                               
any(ActorRef.class))).thenReturn(mockSubmissionFailure);
+                               any(ActorRef.class), 
any(FiniteDuration.class))).thenReturn(mockSubmissionFailure);
 
                Client out = new Client(configMock, 
getClass().getClassLoader());
                out.run(program.getPlanWithJars(), -1, false);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 4e87581..047cfd1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -346,6 +346,11 @@ public final class ConfigConstants {
         * Log level for akka
         */
        public static final String AKKA_LOG_LEVEL = "akka.loglevel";
+
+       /**
+        * Timeout for all blocking calls
+        */
+       public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
        
        // ----------------------------- Miscellaneous 
----------------------------
        
@@ -594,6 +599,8 @@ public final class ConfigConstants {
        public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
 
        public static String DEFAULT_AKKA_LOG_LEVEL = "OFF";
+
+       public static int DEFAULT_AKKA_ASK_TIMEOUT = 100;
        
 
        // ----------------------------- LocalExecution 
----------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh 
b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index 78c9500..70dde8a 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -80,7 +80,7 @@ case $STARTSTOP in
         rotateLogFile $out
 
         echo Starting job manager
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "$FLINK_JM_CLASSPATH" org.apache.flink.runtime.jobmanager.JobManager 
--configDir "$FLINK_CONF_DIR"  > "$out" 2>&1 < /dev/null &
+        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "$FLINK_JM_CLASSPATH" org.apache.flink.runtime.jobmanager.JobManager 
--executionMode $EXECUTIONMODE --configDir "$FLINK_CONF_DIR"  > "$out" 2>&1 < 
/dev/null &
         echo $! > $pid
     ;;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-dist/src/main/flink-bin/bin/start-local.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat 
b/flink-dist/src/main/flink-bin/bin/start-local.bat
index 05d14e5..374254f 100644
--- a/flink-dist/src/main/flink-bin/bin/start-local.bat
+++ b/flink-dist/src/main/flink-bin/bin/start-local.bat
@@ -57,6 +57,6 @@ if not defined FOUND (
 echo Starting Flink job manager. Webinterface by default on 
http://localhost:8081/.
 echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
 
-java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% 
org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster --configDir 
%FLINK_CONF_DIR%  > "%out%"  2>&1
+java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% 
org.apache.flink.runtime.jobmanager.JobManager --executionMode local 
--configDir %FLINK_CONF_DIR%  > "%out%"  2>&1
 
 endlocal

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index a42faf3..2024d0a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -27,10 +27,13 @@ import static 
org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -51,6 +54,7 @@ import org.slf4j.Logger;
 
 import com.google.common.base.Preconditions;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * A single execution of a vertex. While an {@link ExecutionVertex} can be 
executed multiple times (for recovery,
@@ -78,6 +82,9 @@ public class Execution {
        private static final Logger LOG = ExecutionGraph.LOG;
        
        private static final int NUM_CANCEL_CALL_TRIES = 3;
+
+       public static FiniteDuration timeout = new 
FiniteDuration(ConfigConstants
+                       .DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -273,7 +280,7 @@ public class Execution {
 
                        Instance instance = slot.getInstance();
                        Future<Object> deployAction = 
Patterns.ask(instance.getTaskManager(),
-                                       new 
TaskManagerMessages.SubmitTask(deployment),AkkaUtils.FUTURE_TIMEOUT());
+                                       new 
TaskManagerMessages.SubmitTask(deployment), new Timeout(timeout));
 
                        deployAction.onComplete(new OnComplete<Object>(){
 
@@ -583,7 +590,7 @@ public class Execution {
 
                Future<Object> cancelResult = 
AkkaUtils.retry(slot.getInstance().getTaskManager(), new
                                TaskManagerMessages.CancelTask(attemptId), 
NUM_CANCEL_CALL_TRIES,
-                               AkkaUtils.globalExecutionContext());
+                               AkkaUtils.globalExecutionContext(), timeout);
 
                cancelResult.onComplete(new OnComplete<Object>(){
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 99053ea..4e6a56b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -103,7 +103,6 @@ public class ExecutionGraph {
        
        private final long[] stateTimestamps;
        
-       
        private final Object progressLock = new Object();
        
        private int nextVertexToFinish;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 9e3b23c..cd07cc3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.ExceptionUtils;
+import scala.concurrent.duration.FiniteDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,14 +79,19 @@ public class ChannelManager implements EnvelopeDispatcher, 
BufferProviderBroker
        
        private final DiscardBufferPool discardBufferPool;
 
+       private final FiniteDuration timeout;
+
        // 
-----------------------------------------------------------------------------------------------------------------
 
        public ChannelManager(ActorRef channelLookup, InstanceConnectionInfo 
connectionInfo, int numNetworkBuffers,
-                                               int networkBufferSize, 
NetworkConnectionManager networkConnectionManager) throws IOException {
+                                               int networkBufferSize, 
NetworkConnectionManager networkConnectionManager,
+                                               FiniteDuration timeout) throws 
IOException {
 
                this.channelLookup= channelLookup;
                this.connectionInfo = connectionInfo;
 
+               this.timeout = timeout;
+
                try {
                        this.globalBufferPool = new 
GlobalBufferPool(numNetworkBuffers, networkBufferSize);
                } catch (Throwable e) {
@@ -378,7 +384,7 @@ public class ChannelManager implements EnvelopeDispatcher, 
BufferProviderBroker
                                try{
                                        lookupResponse = 
AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
                                                        new 
JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
-                                                                       
sourceChannelID)).response();
+                                                                       
sourceChannelID), timeout).response();
                                }catch(IOException ioe) {
                                        throw ioe;
                                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index f52da0d..e1afa7a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.StringUtils;
 import org.eclipse.jetty.io.EofException;
+import scala.concurrent.duration.FiniteDuration;
 
 
 public class JobmanagerInfoServlet extends HttpServlet {
@@ -75,11 +76,13 @@ public class JobmanagerInfoServlet extends HttpServlet {
        /** Underlying JobManager */
        private final ActorRef jobmanager;
        private final ActorRef archive;
+       private final FiniteDuration timeout;
        
        
-       public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive) {
+       public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive, 
FiniteDuration timeout) {
                this.jobmanager = jobmanager;
                this.archive = archive;
+               this.timeout = timeout;
        }
        
        
@@ -92,14 +95,15 @@ public class JobmanagerInfoServlet extends HttpServlet {
                try {
                        if("archive".equals(req.getParameter("get"))) {
                                List<ExecutionGraph> archivedJobs = new 
ArrayList<ExecutionGraph>(AkkaUtils
-                                               
.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$).asJavaCollection());
+                                               
.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout)
+                                               .asJavaCollection());
 
                                writeJsonForArchive(resp.getWriter(), 
archivedJobs);
                        }
                        else if("job".equals(req.getParameter("get"))) {
                                String jobId = req.getParameter("job");
                                JobResponse response = AkkaUtils.ask(archive,
-                                               new 
RequestJob(JobID.fromHexString(jobId)));
+                                               new 
RequestJob(JobID.fromHexString(jobId)), timeout);
 
                                if(response instanceof JobFound){
                                        ExecutionGraph archivedJob = 
((JobFound)response).executionGraph();
@@ -113,7 +117,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
                                String groupvertexId = 
req.getParameter("groupvertex");
 
                                JobResponse response = AkkaUtils.ask(archive,
-                                               new 
RequestJob(JobID.fromHexString(jobId)));
+                                               new 
RequestJob(JobID.fromHexString(jobId)), timeout);
 
                                if(response instanceof JobFound && 
groupvertexId != null){
                                        ExecutionGraph archivedJob = 
((JobFound)response).executionGraph();
@@ -126,9 +130,9 @@ public class JobmanagerInfoServlet extends HttpServlet {
                        }
                        else if("taskmanagers".equals(req.getParameter("get"))) 
{
                                int numberOfTaskManagers = 
AkkaUtils.<Integer>ask(jobmanager,
-                                               
RequestNumberRegisteredTaskManager$.MODULE$);
+                                               
RequestNumberRegisteredTaskManager$.MODULE$, timeout);
                                int numberOfRegisteredSlots = 
AkkaUtils.<Integer>ask(jobmanager,
-                                               
RequestTotalNumberOfSlots$.MODULE$);
+                                               
RequestTotalNumberOfSlots$.MODULE$, timeout);
 
                                resp.getWriter().write("{\"taskmanagers\": " + 
numberOfTaskManagers +", " +
                                                "\"slots\": 
"+numberOfRegisteredSlots+"}");
@@ -136,7 +140,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
                        else if("cancel".equals(req.getParameter("get"))) {
                                String jobId = req.getParameter("job");
                                AkkaUtils.<CancellationResponse>ask(jobmanager,
-                                               new 
CancelJob(JobID.fromHexString(jobId)));
+                                               new 
CancelJob(JobID.fromHexString(jobId)), timeout);
                        }
                        else if("updates".equals(req.getParameter("get"))) {
                                String jobId = req.getParameter("job");
@@ -146,7 +150,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
                        }
                        else{
                                Iterable<ExecutionGraph> runningJobs = 
AkkaUtils.<RunningJobs>ask
-                                               (jobmanager, 
RequestRunningJobs$.MODULE$).asJavaIterable();
+                                               (jobmanager, 
RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
                                writeJsonForJobs(resp.getWriter(), runningJobs);
                        }
                        
@@ -324,7 +328,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
                        
                        // write accumulators
                        AccumulatorResultsResponse response = 
AkkaUtils.ask(jobmanager,
-                                       new 
RequestAccumulatorResults(graph.getJobID()));
+                                       new 
RequestAccumulatorResults(graph.getJobID()), timeout);
 
                        if(response instanceof AccumulatorResultsFound){
                                Map<String, Object> accMap = 
((AccumulatorResultsFound)response).asJavaMap();
@@ -417,7 +421,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
                
                try {
                        Iterable<ExecutionGraph> graphs = 
AkkaUtils.<RunningJobs>ask(jobmanager,
-                                       
RequestRunningJobs$.MODULE$).asJavaIterable();
+                                       RequestRunningJobs$.MODULE$, 
timeout).asJavaIterable();
                        
                        //Serialize job to json
                        wrt.write("{");
@@ -439,7 +443,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 
                        wrt.write("],");
 
-                       JobResponse response = AkkaUtils.ask(jobmanager, new 
RequestJob(jobId));
+                       JobResponse response = AkkaUtils.ask(jobmanager, new 
RequestJob(jobId), timeout);
 
                        if(response instanceof JobFound){
                                ExecutionGraph graph = 
((JobFound)response).executionGraph();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index c0308cb..9e0a55b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -46,6 +46,7 @@ import org.codehaus.jettison.json.JSONObject;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * A Servlet that displays the Configuration in the web interface.
@@ -59,13 +60,15 @@ public class SetupInfoServlet extends HttpServlet {
        private static final Logger LOG = 
LoggerFactory.getLogger(SetupInfoServlet.class);
        
        
-       private Configuration globalC;
-       private ActorRef jobmanager;
+       final private Configuration globalC;
+       final private ActorRef jobmanager;
+       final private FiniteDuration timeout;
        
        
-       public SetupInfoServlet(ActorRef jm) {
+       public SetupInfoServlet(ActorRef jm, FiniteDuration timeout) {
                globalC = GlobalConfiguration.getConfiguration();
                this.jobmanager = jm;
+               this.timeout = timeout;
        }
        
        @Override
@@ -104,7 +107,7 @@ public class SetupInfoServlet extends HttpServlet {
        private void writeTaskmanagers(HttpServletResponse resp) throws 
IOException {
 
                List<Instance> instances = new 
ArrayList<Instance>(AkkaUtils.<RegisteredTaskManagers>ask
-                               (jobmanager, 
RequestRegisteredTaskManagers$.MODULE$).asJavaCollection());
+                               (jobmanager, 
RequestRegisteredTaskManagers$.MODULE$, timeout).asJavaCollection());
 
                Collections.sort(instances, INSTANCE_SORTER);
                                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index c397a30..7e67603 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -40,6 +40,7 @@ import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.HandlerList;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import scala.concurrent.duration.FiniteDuration;
 
 
 /**
@@ -64,6 +65,11 @@ public class WebInfoServer {
        private final Server server;
 
        /**
+        * Timeout for akka requests
+        */
+       private final FiniteDuration timeout;
+
+       /**
         * Port for info server
         */
        private int port;
@@ -78,10 +84,12 @@ public class WebInfoServer {
         *         Thrown, if the server setup failed for an I/O related reason.
         */
        public WebInfoServer(Configuration config, ActorRef jobmanager,
-                                               ActorRef archive) throws 
IOException {
+                                               ActorRef archive, 
FiniteDuration timeout) throws IOException {
                this.port = 
config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
                                
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
 
+               this.timeout = timeout;
+
                // if no explicit configuration is given, use the global 
configuration
                if (config == null) {
                        config = GlobalConfiguration.getConfiguration();
@@ -122,9 +130,10 @@ public class WebInfoServer {
                ServletContextHandler servletContext = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
                servletContext.setContextPath("/");
                servletContext.addServlet(new ServletHolder(new 
JobmanagerInfoServlet(jobmanager,
-                               archive)), "/jobsInfo");
+                               archive, timeout)), "/jobsInfo");
                servletContext.addServlet(new ServletHolder(new 
LogfileInfoServlet(logDirFiles)), "/logInfo");
-               servletContext.addServlet(new ServletHolder(new 
SetupInfoServlet(jobmanager)), "/setupInfo");
+               servletContext.addServlet(new ServletHolder(new 
SetupInfoServlet(jobmanager, timeout)),
+                               "/setupInfo");
                servletContext.addServlet(new ServletHolder(new MenuServlet()), 
"/menu");
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 8049004..669f94c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import scala.concurrent.duration.FiniteDuration;
 
 public class TaskInputSplitProvider implements InputSplitProvider {
 
@@ -34,18 +35,22 @@ public class TaskInputSplitProvider implements 
InputSplitProvider {
        private final JobID jobId;
        
        private final JobVertexID vertexId;
+
+       private final FiniteDuration timeout;
        
-       public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, 
JobVertexID vertexId) {
+       public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, 
JobVertexID vertexId,
+                                                                 
FiniteDuration timeout) {
                this.jobManager = jobManager;
                this.jobId = jobId;
                this.vertexId = vertexId;
+               this.timeout = timeout;
        }
 
        @Override
        public InputSplit getNextInputSplit() {
                try {
                        TaskManagerMessages.NextInputSplit nextInputSplit = 
AkkaUtils.ask(jobManager,
-                                       new 
JobManagerMessages.RequestNextInputSplit(jobId, vertexId));
+                                       new 
JobManagerMessages.RequestNextInputSplit(jobId, vertexId), timeout);
 
                        return nextInputSplit.inputSplit();
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index ce87e0e..f931497 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -30,9 +30,7 @@ import scala.concurrent.{ExecutionContext, Future, Await}
 import scala.concurrent.duration._
 
 object AkkaUtils {
-  implicit val FUTURE_TIMEOUT: Timeout = 100 minute
-  implicit val AWAIT_DURATION: FiniteDuration = 1 minute
-  implicit val FUTURE_DURATION: FiniteDuration = 1 minute
+  val DEFAULT_TIMEOUT: FiniteDuration = 1 minute
 
   val INF_TIMEOUT = 21474835 seconds
 
@@ -122,34 +120,27 @@ object AkkaUtils {
     ConfigFactory.parseString(getDefaultActorSystemConfigString)
   }
 
-  def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem): 
ActorRef = {
-    Await.result(system.actorSelection(parent.path / child).resolveOne(), 
AWAIT_DURATION)
+  def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, 
timeout:
+  FiniteDuration): ActorRef = {
+    Await.result(system.actorSelection(parent.path / 
child).resolveOne()(timeout), timeout)
   }
 
-  def getReference(path: String)(implicit system: ActorSystem): ActorRef = {
-    Await.result(system.actorSelection(path).resolveOne(), AWAIT_DURATION)
+  def getReference(path: String)(implicit system: ActorSystem, timeout: 
FiniteDuration): ActorRef
+  = {
+    Await.result(system.actorSelection(path).resolveOne()(timeout), timeout)
   }
 
   @throws(classOf[IOException])
-  def ask[T](actorSelection: ActorSelection, msg: Any): T = {
-    ask(actorSelection, msg, FUTURE_TIMEOUT, FUTURE_DURATION)
-  }
-
-  @throws(classOf[IOException])
-  def ask[T](actor: ActorRef, msg: Any): T = {
-    ask(actor, msg, FUTURE_TIMEOUT, FUTURE_DURATION)
-  }
-
-  @throws(classOf[IOException])
-  def ask[T](actorSelection: ActorSelection, msg: Any, timeout: Timeout, 
duration: Duration): T = {
+  def ask[T](actorSelection: ActorSelection, msg: Any)(implicit timeout: 
FiniteDuration): T
+    = {
     val future = Patterns.ask(actorSelection, msg, timeout)
-    Await.result(future, duration).asInstanceOf[T]
+    Await.result(future, timeout).asInstanceOf[T]
   }
 
   @throws(classOf[IOException])
-  def ask[T](actor: ActorRef, msg: Any, timeout: Timeout, duration: Duration): 
T = {
+  def ask[T](actor: ActorRef, msg: Any)(implicit timeout: FiniteDuration): T = 
{
     val future = Patterns.ask(actor, msg, timeout)
-    Await.result(future, duration).asInstanceOf[T]
+    Await.result(future, timeout).asInstanceOf[T]
   }
 
   def askInf[T](actor: ActorRef, msg: Any): T = {
@@ -174,8 +165,8 @@ object AkkaUtils {
   }
 
   def retry(target: ActorRef, message: Any, tries: Int)(implicit 
executionContext:
-  ExecutionContext): Future[Any] = {
-    (target ? message) recoverWith{
+  ExecutionContext, timeout: FiniteDuration): Future[Any] = {
+    (target ? message)(timeout) recoverWith{
       case t: Throwable =>
         if(tries > 0){
           retry(target, message, tries-1)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index a9733de..0d9f672 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -35,13 +35,14 @@ import 
org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, S
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
 import scala.concurrent.Await
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{FiniteDuration, Duration}
 
 
-class JobClient(jobManagerURL: String) extends Actor with ActorLogMessages 
with ActorLogging{
+class JobClient(jobManagerURL: String, timeout: FiniteDuration) extends Actor 
with ActorLogMessages
+with  ActorLogging{
   import context._
 
-  val jobManager = AkkaUtils.getReference(jobManagerURL)
+  val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout)
 
   override def receiveWithLogMessages: Receive = {
     case SubmitJobDetached(jobGraph) =>
@@ -120,15 +121,16 @@ object JobClient{
   }
 
 
-  def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef): 
SubmissionResponse = {
-    import AkkaUtils.FUTURE_TIMEOUT
-    val response = jobClient ? SubmitJobDetached(jobGraph)
+  def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef)(implicit 
timeout: FiniteDuration):
+  SubmissionResponse = {
+    val response = (jobClient ? SubmitJobDetached(jobGraph))(timeout)
 
-    Await.result(response.mapTo[SubmissionResponse],AkkaUtils.FUTURE_DURATION)
+    Await.result(response.mapTo[SubmissionResponse],timeout)
   }
 
   @throws(classOf[IOException])
-  def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: 
ActorRef): Unit = {
+  def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: 
ActorRef)(implicit timeout:
+   FiniteDuration): Unit = {
     val port = AkkaUtils.ask[Int](jobClient, RequestBlobManagerPort)
 
     val serverAddress = new InetSocketAddress(hostname, port)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 75f2a63..1fa89c1 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import java.io.File
 import java.net.{InetSocketAddress}
+import java.util.concurrent.TimeUnit
 
 import akka.actor._
 import akka.pattern.Patterns
@@ -28,10 +29,11 @@ import com.google.common.base.Preconditions
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, 
Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
-import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, 
ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, 
ExecutionGraph}
 import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.{JobException, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -52,7 +54,10 @@ import scala.concurrent.duration._
 class JobManager(val configuration: Configuration) extends
 Actor with ActorLogMessages with ActorLogging with WrapAsScala {
   import context._
-  import AkkaUtils.FUTURE_TIMEOUT
+  implicit val timeout = 
FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+    ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+
+  Execution.timeout = timeout;
 
   log.info("Starting job manager.")
 
@@ -329,7 +334,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
     case RequestJobStatus(jobID) => {
       currentJobs.get(jobID) match {
         case Some((executionGraph,_)) => sender() ! CurrentJobStatus(jobID, 
executionGraph.getState)
-        case None => archive ? RequestJobStatus(jobID) pipeTo sender()
+        case None => (archive ? RequestJobStatus(jobID))(timeout) pipeTo 
sender()
       }
     }
 
@@ -344,7 +349,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
     case RequestJob(jobID) => {
       currentJobs.get(jobID) match {
         case Some((eg, _)) => sender() ! JobFound(jobID, eg)
-        case None => archive ? RequestJob(jobID) pipeTo sender()
+        case None => (archive ? RequestJob(jobID))(timeout) pipeTo sender()
       }
     }
 
@@ -384,6 +389,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
 }
 
 object JobManager {
+  import ExecutionMode._
   val LOG = LoggerFactory.getLogger(classOf[JobManager])
   val FAILURE_RETURN_CODE = 1
   val JOB_MANAGER_NAME = "jobmanager"
@@ -392,19 +398,34 @@ object JobManager {
   val PROFILER_NAME = "profiler"
 
   def main(args: Array[String]): Unit = {
-    val (hostname, port, configuration) = parseArgs(args)
+    val (hostname, port, configuration, executionMode) = parseArgs(args)
 
     val jobManagerSystem = AkkaUtils.createActorSystem(hostname, port, 
configuration)
 
     startActor(Props(new JobManager(configuration) with 
WithWebServer))(jobManagerSystem)
+
+    if(executionMode.equals(LOCAL)){
+      TaskManager.startActorWithConfiguration(hostname, configuration, 
true)(jobManagerSystem)
+    }
+
     jobManagerSystem.awaitTermination()
+    println("Shutting down.")
   }
 
-  def parseArgs(args: Array[String]): (String, Int, Configuration) = {
+  def parseArgs(args: Array[String]): (String, Int, Configuration, 
ExecutionMode) = {
     val parser = new 
scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") {
       head("flink jobmanager")
       opt[String]("configDir") action { (x, c) => c.copy(configDir = x) } text 
("Specify " +
         "configuration directory.")
+      opt[String]("executionMode") optional() action { (x, c) =>
+        if(x.equals("local")){
+          c.copy(executionMode = LOCAL)
+        }else{
+          c.copy(executionMode = CLUSTER)
+        }
+      } text {
+        "Specify execution mode of job manager"
+      }
     }
 
     parser.parse(args, JobManagerCLIConfiguration()) map {
@@ -419,7 +440,7 @@ object JobManager {
         val port = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
           ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-        (hostname, port, configuration)
+        (hostname, port, configuration, config.executionMode)
     } getOrElse {
       LOG.error("CLI Parsing failed. Usage: " + parser.usage)
       sys.exit(FAILURE_RETURN_CODE)
@@ -456,19 +477,23 @@ object JobManager {
     s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}"
   }
 
-  def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem): 
ActorRef = {
+  def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem, timeout: 
FiniteDuration):
+  ActorRef = {
     AkkaUtils.getChild(jobManager, PROFILER_NAME)
   }
 
-  def getEventCollector(jobManager: ActorRef)(implicit system: ActorSystem): 
ActorRef = {
+  def getEventCollector(jobManager: ActorRef)(implicit system: ActorSystem, 
timeout:
+  FiniteDuration): ActorRef = {
     AkkaUtils.getChild(jobManager, EVENT_COLLECTOR_NAME)
   }
 
-  def getArchivist(jobManager: ActorRef)(implicit system: ActorSystem): 
ActorRef = {
+  def getArchivist(jobManager: ActorRef)(implicit system: ActorSystem, 
timeout: FiniteDuration):
+  ActorRef = {
     AkkaUtils.getChild(jobManager, ARCHIVE_NAME)
   }
 
-  def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem): 
ActorRef = {
+  def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem, 
timeout:
+  FiniteDuration): ActorRef = {
     AkkaUtils.getReference(getAkkaURL(address.getHostName + ":" + 
address.getPort))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
index d588f95..d08aecc 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
@@ -18,5 +18,11 @@
 
 package org.apache.flink.runtime.jobmanager
 
-case class JobManagerCLIConfiguration(configDir: String = null) {
+object ExecutionMode extends Enumeration{
+  type ExecutionMode = Value
+  val LOCAL = Value
+  val CLUSTER = Value
 }
+
+case class JobManagerCLIConfiguration(configDir: String = null, executionMode: 
ExecutionMode
+.ExecutionMode = ExecutionMode.CLUSTER) {}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
index 81fecbf..715fc0c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 trait WithWebServer extends Actor {
   that: JobManager =>
 
-  val webServer = new WebInfoServer(configuration,self, archive)
+  val webServer = new WebInfoServer(configuration,self, archive, timeout)
   webServer.start()
 
   abstract override def postStop(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 35def3d..6d0da27 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.minicluster
 
+import java.util.concurrent.TimeUnit
+
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import org.apache.flink.api.common.io.FileOutputFormat
@@ -27,11 +29,15 @@ import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.slf4j.LoggerFactory
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Future, Await}
 
 abstract class FlinkMiniCluster(userConfiguration: Configuration) {
   import FlinkMiniCluster._
 
+  implicit val timeout = 
FiniteDuration(userConfiguration.getInteger(ConfigConstants
+    .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), 
TimeUnit.SECONDS)
+
   val configuration = generateConfiguration(userConfiguration)
 
   val jobManagerActorSystem = startJobManagerActorSystem()
@@ -92,19 +98,18 @@ abstract class FlinkMiniCluster(userConfiguration: 
Configuration) {
   }
 
   def awaitTermination(): Unit = {
-    taskManagerActorSystems foreach { 
_.awaitTermination(AkkaUtils.AWAIT_DURATION)}
-    jobManagerActorSystem.awaitTermination(AkkaUtils.AWAIT_DURATION)
+    taskManagerActorSystems foreach { _.awaitTermination()}
+    jobManagerActorSystem.awaitTermination()
   }
 
   def waitForTaskManagersToBeRegistered(): Unit = {
-    implicit val timeout = AkkaUtils.FUTURE_TIMEOUT
     implicit val executionContext = AkkaUtils.globalExecutionContext
 
     val futures = taskManagerActors map {
-      _ ? NotifyWhenRegisteredAtJobManager
+      taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout)
     }
 
-    Await.ready(Future.sequence(futures), AkkaUtils.AWAIT_DURATION)
+    Await.ready(Future.sequence(futures), timeout)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 8625983..fb6e36b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -118,40 +118,4 @@ FlinkMiniCluster(userConfiguration){
 
 object LocalFlinkMiniCluster{
   val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
-  val FAILURE_RETURN_CODE = 1
-
-  def main(args: Array[String]): Unit = {
-    val configuration = parseArgs(args)
-
-    val cluster = new LocalFlinkMiniCluster(configuration)
-
-    cluster.awaitTermination()
-  }
-
-  def parseArgs(args: Array[String]): Configuration = {
-    val parser = new 
OptionParser[LocalFlinkMiniClusterConfiguration]("LocalFlinkMiniCluster") {
-      head("LocalFlinkMiniCluster")
-      opt[String]("configDir") action { (value, config) => 
config.copy(configDir = value) } text
-        {"Specify configuration directory."}
-    }
-
-    parser.parse(args, LocalFlinkMiniClusterConfiguration()) map {
-      config =>{
-        GlobalConfiguration.loadConfiguration(config.configDir)
-        val configuration = GlobalConfiguration.getConfiguration
-
-        if(config.configDir != null && new File(config.configDir).isDirectory){
-          configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
config.configDir + "/..")
-        }
-
-        configuration
-      }
-    } getOrElse{
-      LOG.error("CLI parsing failed. Usage: " + parser.usage)
-      sys.exit(FAILURE_RETURN_CODE)
-    }
-  }
-
-
-  case class LocalFlinkMiniClusterConfiguration(val configDir: String = "")
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 136dc7f..66d25c5 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -69,8 +69,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, 
val jobManagerAkka
   extends Actor with ActorLogMessages with ActorLogging with DecorateAsScala 
with WrapAsScala {
 
   import context._
-  import AkkaUtils.FUTURE_TIMEOUT
-  import taskManagerConfig._
+  import taskManagerConfig.{timeout => tmTimeout, _}
+  implicit val timeout = tmTimeout
+
 
   log.info(s"Starting task manager at ${self.path}.")
 
@@ -172,7 +173,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
         jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, 
numberOfSlots)
       } else {
         log.error("TaskManager could not register at JobManager.");
-        throw new RuntimeException("TaskManager could not register at 
JobManager");
+        self ! PoisonPill
       }
     }
 
@@ -182,6 +183,8 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
         currentJobManager = sender()
         instanceID = id
 
+        context.watch(currentJobManager)
+
         log.info(s"TaskManager successfully registered at JobManager ${
           currentJobManager.path
             .toString
@@ -247,7 +250,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
           case None =>
         }
 
-        val splitProvider = new TaskInputSplitProvider(currentJobManager, 
jobID, vertexID)
+        val splitProvider = new TaskInputSplitProvider(currentJobManager, 
jobID, vertexID, timeout)
         val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, 
memoryManager,
           ioManager, splitProvider,currentJobManager)
 
@@ -356,14 +359,19 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
           log.error(s"Cannot find task with ID ${executionID} to unregister.")
       }
     }
+
+    case Terminated(jobManager) => {
+      log.info(s"Job manager ${jobManager.path} is no longer reachable. Try to 
reregister.")
+      tryJobManagerRegistration()
+    }
   }
 
   def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID,
                                  executionState: ExecutionState,
                                  optionalError: Throwable): Unit = {
     log.info(s"Update execution state to ${executionState}.")
-    val futureResponse = currentJobManager ? UpdateTaskExecutionState(new 
TaskExecutionState
-    (jobID, executionID, executionState, optionalError))
+    val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new 
TaskExecutionState
+    (jobID, executionID, executionState, optionalError)))(timeout)
 
     val receiver = this.self
 
@@ -402,7 +410,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
       }
 
       channelManager = Some(new ChannelManager(currentJobManager, 
connectionInfo, numBuffers,
-        bufferSize, connectionManager))
+        bufferSize, connectionManager, timeout))
     } catch {
       case ioe: IOException =>
         log.error(ioe, "Failed to instantiate ChannelManager.")
@@ -412,7 +420,8 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
 
   def setupLibraryCacheManager(blobPort: Int): Unit = {
     if(blobPort > 0){
-      val address = new 
InetSocketAddress(currentJobManager.path.address.host.get, blobPort)
+      val address = new 
InetSocketAddress(currentJobManager.path.address.host.getOrElse
+        ("localhost"), blobPort)
       libraryCacheManager = new BlobLibraryCacheManager(new 
BlobCache(address), cleanupInterval)
     }else{
       libraryCacheManager = new FallbackLibraryCacheManager
@@ -598,8 +607,11 @@ object TaskManager {
       .LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
+    val timeout = 
FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+
     val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, 
memorySize, pageSize,
-      tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval)
+      tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, 
timeout)
 
     (connectionInfo, jobManagerURL, taskManagerConfig, 
networkConnectionConfiguration)
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index 150db73..a6a76a3 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.taskmanager
 
+import scala.concurrent.duration.FiniteDuration
+
 case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, 
pageSize: Int,
                                     tmpDirPaths: Array[String], 
cleanupInterval: Long,
                                     memoryLogggingIntervalMs: Option[Long],
-                                    profilingInterval: Option[Long])
+                                    profilingInterval: Option[Long],
+                                    timeout: FiniteDuration)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index c6b4fb5..b76944b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -32,6 +32,7 @@ import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -73,6 +74,8 @@ public class TaskManagerTest {
 
        private static ActorSystem system;
 
+       private static Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+
        @BeforeClass
        public static void setup(){
                system = ActorSystem.create("TestActorSystem", 
TestingUtils.testConfig());
@@ -178,7 +181,7 @@ public class TaskManagerTest {
                                                        expectMsgEquals(new 
TaskOperationResult(eid1, true));
 
                                                        Future<Object> response 
= Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-                                                                       
AkkaUtils.FUTURE_TIMEOUT());
+                                                                       
timeout);
                                                        Await.ready(response, 
d);
 
                                                        
assertEquals(ExecutionState.CANCELED, t1.getExecutionState());
@@ -197,7 +200,7 @@ public class TaskManagerTest {
                                                        expectMsgEquals(new 
TaskOperationResult(eid2, true));
 
                                                        response = 
Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-                                                                       
AkkaUtils.FUTURE_TIMEOUT());
+                                                                       
timeout);
                                                        Await.ready(response, 
d);
 
                                                        
assertEquals(ExecutionState.CANCELED, t2.getExecutionState());
@@ -336,13 +339,13 @@ public class TaskManagerTest {
                        // we get to the check, so we need to guard the check
                                                if (t1 != null) {
                                                        Future<Object> response 
= Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-                                                                       
AkkaUtils.FUTURE_TIMEOUT());
+                                                                       
timeout);
                                                        Await.ready(response, 
d);
                                                }
 
                                                if (t2 != null) {
                                                        Future<Object> response 
= Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-                                                                       
AkkaUtils.FUTURE_TIMEOUT());
+                                                                       
timeout);
                                                        Await.ready(response, 
d);
                                assertEquals(ExecutionState.FINISHED, 
t2.getExecutionState());
                        }
@@ -424,7 +427,7 @@ public class TaskManagerTest {
 
                                                if (t2 != null) {
                                                        Future<Object> response 
= Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-                                                                       
AkkaUtils.FUTURE_TIMEOUT());
+                                                                       
timeout);
                                                        Await.ready(response, 
d);
                                                }
 
@@ -434,7 +437,7 @@ public class TaskManagerTest {
                                                                
expectMsgEquals(new TaskOperationResult(eid1, true));
                                                        }
                                                        Future<Object> response 
= Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-                                                                       
AkkaUtils.FUTURE_TIMEOUT());
+                                                                       
timeout);
                                                        Await.ready(response, 
d);
                                                }
 
@@ -538,7 +541,7 @@ public class TaskManagerTest {
                ActorRef taskManager = 
TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system);
 
                Future<Object> response = Patterns.ask(taskManager, 
NotifyWhenRegisteredAtJobManager$.MODULE$,
-                               AkkaUtils.FUTURE_TIMEOUT());
+                               timeout);
 
                try {
                        FiniteDuration d = new FiniteDuration(2, 
TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index e23992c..6689f93 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -37,6 +37,8 @@ import scala.concurrent.duration._
 @RunWith(classOf[JUnitRunner])
 class JobManagerITCase(_system: ActorSystem) extends TestKit(_system) with 
ImplicitSender with
 WordSpecLike with Matchers with BeforeAndAfterAll {
+  implicit val timeout = 1 minute
+
   def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 2e08dc3..6347cb5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -23,9 +23,8 @@ import java.util.concurrent.TimeUnit;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.dispatch.ExecutionContexts;
 import akka.pattern.Patterns;
-import com.amazonaws.http.ExecutionContext;
+import akka.util.Timeout;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -116,7 +115,7 @@ public abstract class CancellingTestBase {
                        boolean jobSuccessfullyCancelled = false;
 
                        Future<Object> result = Patterns.ask(client, new 
JobClientMessages.SubmitJobAndWait
-                                       (jobGraph, false), 
AkkaUtils.FUTURE_TIMEOUT());
+                                       (jobGraph, false), new 
Timeout(AkkaUtils.DEFAULT_TIMEOUT()));
 
                        actorSystem.scheduler().scheduleOnce(new 
FiniteDuration(msecsTillCanceling,
                                                        TimeUnit.MILLISECONDS), 
client, new JobManagerMessages.CancelJob(jobGraph.getJobID()),
@@ -125,7 +124,7 @@ public abstract class CancellingTestBase {
                                                        throw new 
IllegalStateException("Job restarted");
 
                        try {
-                               Await.result(result, 
AkkaUtils.AWAIT_DURATION());
+                               Await.result(result, 
AkkaUtils.DEFAULT_TIMEOUT());
                        } catch (JobExecutionException exception) {
                                if (!exception.isJobCanceledByUser()) {
                                        throw new IllegalStateException("Job 
Failed.");

Reply via email to