Reworked local cluster start. TaskManager watches JobManager and tries reregistration in case of disconnect. Introduced akka.ask.timeout config parameter to configure akka timeouts.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b8d0a0aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b8d0a0aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b8d0a0aa Branch: refs/heads/master Commit: b8d0a0aadaed012ccb93176dcc21acbdbd005eea Parents: 26c7794 Author: Till Rohrmann <[email protected]> Authored: Wed Nov 12 19:48:36 2014 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Dec 18 18:58:31 2014 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 16 +++++-- .../org/apache/flink/client/program/Client.java | 9 +++- .../flink/client/web/JobsInfoServlet.java | 11 ++++- .../flink/client/web/WebInterfaceServer.java | 5 ++- .../apache/flink/client/program/ClientTest.java | 6 ++- .../flink/configuration/ConfigConstants.java | 7 +++ flink-dist/src/main/flink-bin/bin/jobmanager.sh | 2 +- .../src/main/flink-bin/bin/start-local.bat | 2 +- .../flink/runtime/executiongraph/Execution.java | 11 ++++- .../runtime/executiongraph/ExecutionGraph.java | 1 - .../runtime/io/network/ChannelManager.java | 10 ++++- .../jobmanager/web/JobmanagerInfoServlet.java | 26 ++++++----- .../jobmanager/web/SetupInfoServlet.java | 11 +++-- .../runtime/jobmanager/web/WebInfoServer.java | 15 +++++-- .../taskmanager/TaskInputSplitProvider.java | 9 +++- .../apache/flink/runtime/akka/AkkaUtils.scala | 37 ++++++--------- .../apache/flink/runtime/client/JobClient.scala | 18 ++++---- .../flink/runtime/jobmanager/JobManager.scala | 47 +++++++++++++++----- .../jobmanager/JobManagerCLIConfiguration.scala | 8 +++- .../runtime/jobmanager/WithWebServer.scala | 2 +- .../runtime/minicluster/FlinkMiniCluster.scala | 15 ++++--- .../minicluster/LocalFlinkMiniCluster.scala | 36 --------------- .../flink/runtime/taskmanager/TaskManager.scala | 30 +++++++++---- .../taskmanager/TaskManagerConfiguration.scala | 5 ++- .../runtime/taskmanager/TaskManagerTest.java | 17 ++++--- .../runtime/jobmanager/JobManagerITCase.scala | 2 + .../test/cancelling/CancellingTestBase.java | 7 ++- 27 files changed, 221 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 85ab07a..a4d1ac6 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -33,6 +33,7 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -63,6 +64,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs; import org.apache.flink.util.StringUtils; +import scala.concurrent.duration.FiniteDuration; /** * Implementation of a simple command line fronted for executing programs. @@ -511,7 +513,7 @@ public class CliFrontend { } Iterable<ExecutionGraph> jobs = AkkaUtils.<RunningJobs>ask(jobManager, - RequestRunningJobs$.MODULE$).asJavaIterable(); + RequestRunningJobs$.MODULE$, getAkkaTimeout()).asJavaIterable(); ArrayList<ExecutionGraph> runningJobs = null; ArrayList<ExecutionGraph> scheduledJobs = null; @@ -632,7 +634,7 @@ public class CliFrontend { return 1; } - AkkaUtils.ask(jobManager, new CancelJob(jobId)); + AkkaUtils.ask(jobManager, new CancelJob(jobId), getAkkaTimeout()); return 0; } catch (Throwable t) { @@ -756,7 +758,8 @@ public class CliFrontend { } return JobManager.getJobManager(jobManagerAddress, - ActorSystem.create("CliFrontendActorSystem", AkkaUtils.getDefaultActorSystemConfig())); + ActorSystem.create("CliFrontendActorSystem", AkkaUtils + .getDefaultActorSystemConfig()),getAkkaTimeout()); } @@ -815,6 +818,13 @@ public class CliFrontend { } return GlobalConfiguration.getConfiguration(); } + + protected FiniteDuration getAkkaTimeout(){ + Configuration config = getGlobalConfiguration(); + + return new FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS); + } public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) { List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index d2c9983..203f294 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.PrintStream; import java.net.InetSocketAddress; import java.util.List; +import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -53,6 +54,7 @@ import com.google.common.base.Preconditions; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; +import scala.concurrent.duration.FiniteDuration; /** * Encapsulates the functionality necessary to submit a program to a remote cluster. @@ -301,12 +303,15 @@ public class Client { String hostname = configuration.getString(ConfigConstants .JOB_MANAGER_IPC_ADDRESS_KEY, null); + FiniteDuration timeout = new FiniteDuration(configuration.getInteger(ConfigConstants + .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS); + if(hostname == null){ throw new ProgramInvocationException("Could not find hostname of job manager."); } try { - JobClient.uploadJarFiles(jobGraph, hostname, client); + JobClient.uploadJarFiles(jobGraph, hostname, client, timeout); }catch(IOException e){ throw new ProgramInvocationException("Could not upload blobs.", e); } @@ -317,7 +322,7 @@ public class Client { return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client); } else { - SubmissionResponse response =JobClient.submitJobDetached(jobGraph, client); + SubmissionResponse response =JobClient.submitJobDetached(jobGraph, client, timeout); if(response instanceof SubmissionFailure){ SubmissionFailure failure = (SubmissionFailure) response; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java index ba7f112..e339ec7 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java +++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.util.Iterator; +import java.util.concurrent.TimeUnit; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs; +import scala.concurrent.duration.FiniteDuration; public class JobsInfoServlet extends HttpServlet { @@ -51,11 +53,15 @@ public class JobsInfoServlet extends HttpServlet { private final Configuration config; private final ActorSystem system; + + private final FiniteDuration timeout; public JobsInfoServlet(Configuration flinkConfig) { this.config = flinkConfig; system = ActorSystem.create("JobsInfoServletActorSystem", AkkaUtils.getDefaultActorSystemConfig()); + this.timeout = new FiniteDuration(flinkConfig.getInteger(ConfigConstants + .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS); } @Override @@ -67,10 +73,11 @@ public class JobsInfoServlet extends HttpServlet { int jmPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); - ActorRef jm = JobManager.getJobManager(new InetSocketAddress(jmHost, jmPort), system); + ActorRef jm = JobManager.getJobManager(new InetSocketAddress(jmHost, jmPort), system, + timeout); Iterator<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jm, - RequestRunningJobs$.MODULE$).asJavaIterable().iterator(); + RequestRunningJobs$.MODULE$, timeout).asJavaIterable().iterator(); resp.setStatus(HttpServletResponse.SC_OK); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java index 1de21ff..ad7b6d4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java +++ b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java @@ -95,7 +95,7 @@ public class WebInterfaceServer { throw new FileNotFoundException("Cannot start web interface server because the web " + "root dir " + WEB_ROOT_DIR + " is not included in the jar."); } - + String tmpDirPath = config.getString(ConfigConstants.WEB_TMP_DIR_KEY, ConfigConstants.DEFAULT_WEB_TMP_DIR); @@ -155,7 +155,8 @@ public class WebInterfaceServer { ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS); servletContext.setContextPath("/"); servletContext.addServlet(new ServletHolder(new PactJobJSONServlet(uploadDir)), "/pactPlan"); - servletContext.addServlet(new ServletHolder(new JobsInfoServlet(nepheleConfig)), "/jobsInfo"); + servletContext.addServlet(new ServletHolder(new JobsInfoServlet(nepheleConfig)), + "/jobsInfo"); servletContext.addServlet(new ServletHolder(new PlanDisplayServlet(jobManagerWebPort)), "/showPlan"); servletContext.addServlet(new ServletHolder(new JobsServlet(uploadDir, tmpDir, "launch.html")), "/jobs"); servletContext.addServlet(new ServletHolder(new JobSubmissionServlet(nepheleConfig, uploadDir, planDumpDir)), http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 7f81ccd..aa7f4aa 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -43,8 +43,10 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; import scala.Tuple2; +import scala.concurrent.duration.FiniteDuration; import java.io.IOException; +import java.util.concurrent.TimeUnit; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -105,7 +107,7 @@ public class ClientTest { @Test public void shouldSubmitToJobClient() throws ProgramInvocationException, IOException { when(mockJobClient.submitJobDetached(any(JobGraph.class), - any(ActorRef.class))).thenReturn(mockSubmissionSuccess); + any(ActorRef.class), any(FiniteDuration.class))).thenReturn(mockSubmissionSuccess); Client out = new Client(configMock, getClass().getClassLoader()); out.run(program.getPlanWithJars(), -1, false); @@ -118,7 +120,7 @@ public class ClientTest { @Test(expected = ProgramInvocationException.class) public void shouldThrowException() throws Exception { when(mockJobClient.submitJobDetached(any(JobGraph.class), - any(ActorRef.class))).thenReturn(mockSubmissionFailure); + any(ActorRef.class), any(FiniteDuration.class))).thenReturn(mockSubmissionFailure); Client out = new Client(configMock, getClass().getClassLoader()); out.run(program.getPlanWithJars(), -1, false); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 4e87581..047cfd1 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -346,6 +346,11 @@ public final class ConfigConstants { * Log level for akka */ public static final String AKKA_LOG_LEVEL = "akka.loglevel"; + + /** + * Timeout for all blocking calls + */ + public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout"; // ----------------------------- Miscellaneous ---------------------------- @@ -594,6 +599,8 @@ public final class ConfigConstants { public static String DEFAULT_AKKA_FRAMESIZE = "10485760b"; public static String DEFAULT_AKKA_LOG_LEVEL = "OFF"; + + public static int DEFAULT_AKKA_ASK_TIMEOUT = 100; // ----------------------------- LocalExecution ---------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-dist/src/main/flink-bin/bin/jobmanager.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh index 78c9500..70dde8a 100755 --- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh @@ -80,7 +80,7 @@ case $STARTSTOP in rotateLogFile $out echo Starting job manager - $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "$FLINK_JM_CLASSPATH" org.apache.flink.runtime.jobmanager.JobManager --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null & + $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "$FLINK_JM_CLASSPATH" org.apache.flink.runtime.jobmanager.JobManager --executionMode $EXECUTIONMODE --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null & echo $! > $pid ;; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-dist/src/main/flink-bin/bin/start-local.bat ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat b/flink-dist/src/main/flink-bin/bin/start-local.bat index 05d14e5..374254f 100644 --- a/flink-dist/src/main/flink-bin/bin/start-local.bat +++ b/flink-dist/src/main/flink-bin/bin/start-local.bat @@ -57,6 +57,6 @@ if not defined FOUND ( echo Starting Flink job manager. Webinterface by default on http://localhost:8081/. echo Don't close this batch window. Stop job manager by pressing Ctrl+C. -java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster --configDir %FLINK_CONF_DIR% > "%out%" 2>&1 +java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% org.apache.flink.runtime.jobmanager.JobManager --executionMode local --configDir %FLINK_CONF_DIR% > "%out%" 2>&1 endlocal http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index a42faf3..2024d0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -27,10 +27,13 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; import static org.apache.flink.runtime.execution.ExecutionState.RUNNING; import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import akka.dispatch.OnComplete; import akka.pattern.Patterns; +import akka.util.Timeout; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -51,6 +54,7 @@ import org.slf4j.Logger; import com.google.common.base.Preconditions; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery, @@ -78,6 +82,9 @@ public class Execution { private static final Logger LOG = ExecutionGraph.LOG; private static final int NUM_CANCEL_CALL_TRIES = 3; + + public static FiniteDuration timeout = new FiniteDuration(ConfigConstants + .DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS); // -------------------------------------------------------------------------------------------- @@ -273,7 +280,7 @@ public class Execution { Instance instance = slot.getInstance(); Future<Object> deployAction = Patterns.ask(instance.getTaskManager(), - new TaskManagerMessages.SubmitTask(deployment),AkkaUtils.FUTURE_TIMEOUT()); + new TaskManagerMessages.SubmitTask(deployment), new Timeout(timeout)); deployAction.onComplete(new OnComplete<Object>(){ @@ -583,7 +590,7 @@ public class Execution { Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new TaskManagerMessages.CancelTask(attemptId), NUM_CANCEL_CALL_TRIES, - AkkaUtils.globalExecutionContext()); + AkkaUtils.globalExecutionContext(), timeout); cancelResult.onComplete(new OnComplete<Object>(){ http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 99053ea..4e6a56b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -103,7 +103,6 @@ public class ExecutionGraph { private final long[] stateTimestamps; - private final Object progressLock = new Object(); private int nextVertexToFinish; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java index 9e3b23c..cd07cc3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.util.ExceptionUtils; +import scala.concurrent.duration.FiniteDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,14 +79,19 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker private final DiscardBufferPool discardBufferPool; + private final FiniteDuration timeout; + // ----------------------------------------------------------------------------------------------------------------- public ChannelManager(ActorRef channelLookup, InstanceConnectionInfo connectionInfo, int numNetworkBuffers, - int networkBufferSize, NetworkConnectionManager networkConnectionManager) throws IOException { + int networkBufferSize, NetworkConnectionManager networkConnectionManager, + FiniteDuration timeout) throws IOException { this.channelLookup= channelLookup; this.connectionInfo = connectionInfo; + this.timeout = timeout; + try { this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize); } catch (Throwable e) { @@ -378,7 +384,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker try{ lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup, new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID, - sourceChannelID)).response(); + sourceChannelID), timeout).response(); }catch(IOException ioe) { throw ioe; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java index f52da0d..e1afa7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java @@ -64,6 +64,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.StringUtils; import org.eclipse.jetty.io.EofException; +import scala.concurrent.duration.FiniteDuration; public class JobmanagerInfoServlet extends HttpServlet { @@ -75,11 +76,13 @@ public class JobmanagerInfoServlet extends HttpServlet { /** Underlying JobManager */ private final ActorRef jobmanager; private final ActorRef archive; + private final FiniteDuration timeout; - public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive) { + public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) { this.jobmanager = jobmanager; this.archive = archive; + this.timeout = timeout; } @@ -92,14 +95,15 @@ public class JobmanagerInfoServlet extends HttpServlet { try { if("archive".equals(req.getParameter("get"))) { List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils - .<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$).asJavaCollection()); + .<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout) + .asJavaCollection()); writeJsonForArchive(resp.getWriter(), archivedJobs); } else if("job".equals(req.getParameter("get"))) { String jobId = req.getParameter("job"); JobResponse response = AkkaUtils.ask(archive, - new RequestJob(JobID.fromHexString(jobId))); + new RequestJob(JobID.fromHexString(jobId)), timeout); if(response instanceof JobFound){ ExecutionGraph archivedJob = ((JobFound)response).executionGraph(); @@ -113,7 +117,7 @@ public class JobmanagerInfoServlet extends HttpServlet { String groupvertexId = req.getParameter("groupvertex"); JobResponse response = AkkaUtils.ask(archive, - new RequestJob(JobID.fromHexString(jobId))); + new RequestJob(JobID.fromHexString(jobId)), timeout); if(response instanceof JobFound && groupvertexId != null){ ExecutionGraph archivedJob = ((JobFound)response).executionGraph(); @@ -126,9 +130,9 @@ public class JobmanagerInfoServlet extends HttpServlet { } else if("taskmanagers".equals(req.getParameter("get"))) { int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager, - RequestNumberRegisteredTaskManager$.MODULE$); + RequestNumberRegisteredTaskManager$.MODULE$, timeout); int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager, - RequestTotalNumberOfSlots$.MODULE$); + RequestTotalNumberOfSlots$.MODULE$, timeout); resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " + "\"slots\": "+numberOfRegisteredSlots+"}"); @@ -136,7 +140,7 @@ public class JobmanagerInfoServlet extends HttpServlet { else if("cancel".equals(req.getParameter("get"))) { String jobId = req.getParameter("job"); AkkaUtils.<CancellationResponse>ask(jobmanager, - new CancelJob(JobID.fromHexString(jobId))); + new CancelJob(JobID.fromHexString(jobId)), timeout); } else if("updates".equals(req.getParameter("get"))) { String jobId = req.getParameter("job"); @@ -146,7 +150,7 @@ public class JobmanagerInfoServlet extends HttpServlet { } else{ Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask - (jobmanager, RequestRunningJobs$.MODULE$).asJavaIterable(); + (jobmanager, RequestRunningJobs$.MODULE$, timeout).asJavaIterable(); writeJsonForJobs(resp.getWriter(), runningJobs); } @@ -324,7 +328,7 @@ public class JobmanagerInfoServlet extends HttpServlet { // write accumulators AccumulatorResultsResponse response = AkkaUtils.ask(jobmanager, - new RequestAccumulatorResults(graph.getJobID())); + new RequestAccumulatorResults(graph.getJobID()), timeout); if(response instanceof AccumulatorResultsFound){ Map<String, Object> accMap = ((AccumulatorResultsFound)response).asJavaMap(); @@ -417,7 +421,7 @@ public class JobmanagerInfoServlet extends HttpServlet { try { Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager, - RequestRunningJobs$.MODULE$).asJavaIterable(); + RequestRunningJobs$.MODULE$, timeout).asJavaIterable(); //Serialize job to json wrt.write("{"); @@ -439,7 +443,7 @@ public class JobmanagerInfoServlet extends HttpServlet { wrt.write("],"); - JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId)); + JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId), timeout); if(response instanceof JobFound){ ExecutionGraph graph = ((JobFound)response).executionGraph(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java index c0308cb..9e0a55b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java @@ -46,6 +46,7 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * A Servlet that displays the Configuration in the web interface. @@ -59,13 +60,15 @@ public class SetupInfoServlet extends HttpServlet { private static final Logger LOG = LoggerFactory.getLogger(SetupInfoServlet.class); - private Configuration globalC; - private ActorRef jobmanager; + final private Configuration globalC; + final private ActorRef jobmanager; + final private FiniteDuration timeout; - public SetupInfoServlet(ActorRef jm) { + public SetupInfoServlet(ActorRef jm, FiniteDuration timeout) { globalC = GlobalConfiguration.getConfiguration(); this.jobmanager = jm; + this.timeout = timeout; } @Override @@ -104,7 +107,7 @@ public class SetupInfoServlet extends HttpServlet { private void writeTaskmanagers(HttpServletResponse resp) throws IOException { List<Instance> instances = new ArrayList<Instance>(AkkaUtils.<RegisteredTaskManagers>ask - (jobmanager, RequestRegisteredTaskManagers$.MODULE$).asJavaCollection()); + (jobmanager, RequestRegisteredTaskManagers$.MODULE$, timeout).asJavaCollection()); Collections.sort(instances, INSTANCE_SORTER); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java index c397a30..7e67603 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java @@ -40,6 +40,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.HandlerList; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import scala.concurrent.duration.FiniteDuration; /** @@ -64,6 +65,11 @@ public class WebInfoServer { private final Server server; /** + * Timeout for akka requests + */ + private final FiniteDuration timeout; + + /** * Port for info server */ private int port; @@ -78,10 +84,12 @@ public class WebInfoServer { * Thrown, if the server setup failed for an I/O related reason. */ public WebInfoServer(Configuration config, ActorRef jobmanager, - ActorRef archive) throws IOException { + ActorRef archive, FiniteDuration timeout) throws IOException { this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); + this.timeout = timeout; + // if no explicit configuration is given, use the global configuration if (config == null) { config = GlobalConfiguration.getConfiguration(); @@ -122,9 +130,10 @@ public class WebInfoServer { ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS); servletContext.setContextPath("/"); servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager, - archive)), "/jobsInfo"); + archive, timeout)), "/jobsInfo"); servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo"); - servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager)), "/setupInfo"); + servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager, timeout)), + "/setupInfo"); servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu"); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java index 8049004..669f94c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.TaskManagerMessages; +import scala.concurrent.duration.FiniteDuration; public class TaskInputSplitProvider implements InputSplitProvider { @@ -34,18 +35,22 @@ public class TaskInputSplitProvider implements InputSplitProvider { private final JobID jobId; private final JobVertexID vertexId; + + private final FiniteDuration timeout; - public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId) { + public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId, + FiniteDuration timeout) { this.jobManager = jobManager; this.jobId = jobId; this.vertexId = vertexId; + this.timeout = timeout; } @Override public InputSplit getNextInputSplit() { try { TaskManagerMessages.NextInputSplit nextInputSplit = AkkaUtils.ask(jobManager, - new JobManagerMessages.RequestNextInputSplit(jobId, vertexId)); + new JobManagerMessages.RequestNextInputSplit(jobId, vertexId), timeout); return nextInputSplit.inputSplit(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index ce87e0e..f931497 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -30,9 +30,7 @@ import scala.concurrent.{ExecutionContext, Future, Await} import scala.concurrent.duration._ object AkkaUtils { - implicit val FUTURE_TIMEOUT: Timeout = 100 minute - implicit val AWAIT_DURATION: FiniteDuration = 1 minute - implicit val FUTURE_DURATION: FiniteDuration = 1 minute + val DEFAULT_TIMEOUT: FiniteDuration = 1 minute val INF_TIMEOUT = 21474835 seconds @@ -122,34 +120,27 @@ object AkkaUtils { ConfigFactory.parseString(getDefaultActorSystemConfigString) } - def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem): ActorRef = { - Await.result(system.actorSelection(parent.path / child).resolveOne(), AWAIT_DURATION) + def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, timeout: + FiniteDuration): ActorRef = { + Await.result(system.actorSelection(parent.path / child).resolveOne()(timeout), timeout) } - def getReference(path: String)(implicit system: ActorSystem): ActorRef = { - Await.result(system.actorSelection(path).resolveOne(), AWAIT_DURATION) + def getReference(path: String)(implicit system: ActorSystem, timeout: FiniteDuration): ActorRef + = { + Await.result(system.actorSelection(path).resolveOne()(timeout), timeout) } @throws(classOf[IOException]) - def ask[T](actorSelection: ActorSelection, msg: Any): T = { - ask(actorSelection, msg, FUTURE_TIMEOUT, FUTURE_DURATION) - } - - @throws(classOf[IOException]) - def ask[T](actor: ActorRef, msg: Any): T = { - ask(actor, msg, FUTURE_TIMEOUT, FUTURE_DURATION) - } - - @throws(classOf[IOException]) - def ask[T](actorSelection: ActorSelection, msg: Any, timeout: Timeout, duration: Duration): T = { + def ask[T](actorSelection: ActorSelection, msg: Any)(implicit timeout: FiniteDuration): T + = { val future = Patterns.ask(actorSelection, msg, timeout) - Await.result(future, duration).asInstanceOf[T] + Await.result(future, timeout).asInstanceOf[T] } @throws(classOf[IOException]) - def ask[T](actor: ActorRef, msg: Any, timeout: Timeout, duration: Duration): T = { + def ask[T](actor: ActorRef, msg: Any)(implicit timeout: FiniteDuration): T = { val future = Patterns.ask(actor, msg, timeout) - Await.result(future, duration).asInstanceOf[T] + Await.result(future, timeout).asInstanceOf[T] } def askInf[T](actor: ActorRef, msg: Any): T = { @@ -174,8 +165,8 @@ object AkkaUtils { } def retry(target: ActorRef, message: Any, tries: Int)(implicit executionContext: - ExecutionContext): Future[Any] = { - (target ? message) recoverWith{ + ExecutionContext, timeout: FiniteDuration): Future[Any] = { + (target ? message)(timeout) recoverWith{ case t: Throwable => if(tries > 0){ retry(target, message, tries-1) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index a9733de..0d9f672 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -35,13 +35,14 @@ import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, S import org.apache.flink.runtime.messages.JobManagerMessages._ import scala.concurrent.Await -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{FiniteDuration, Duration} -class JobClient(jobManagerURL: String) extends Actor with ActorLogMessages with ActorLogging{ +class JobClient(jobManagerURL: String, timeout: FiniteDuration) extends Actor with ActorLogMessages +with ActorLogging{ import context._ - val jobManager = AkkaUtils.getReference(jobManagerURL) + val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout) override def receiveWithLogMessages: Receive = { case SubmitJobDetached(jobGraph) => @@ -120,15 +121,16 @@ object JobClient{ } - def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef): SubmissionResponse = { - import AkkaUtils.FUTURE_TIMEOUT - val response = jobClient ? SubmitJobDetached(jobGraph) + def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef)(implicit timeout: FiniteDuration): + SubmissionResponse = { + val response = (jobClient ? SubmitJobDetached(jobGraph))(timeout) - Await.result(response.mapTo[SubmissionResponse],AkkaUtils.FUTURE_DURATION) + Await.result(response.mapTo[SubmissionResponse],timeout) } @throws(classOf[IOException]) - def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: ActorRef): Unit = { + def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: ActorRef)(implicit timeout: + FiniteDuration): Unit = { val port = AkkaUtils.ask[Int](jobClient, RequestBlobManagerPort) val serverAddress = new InetSocketAddress(hostname, port) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 75f2a63..1fa89c1 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager import java.io.File import java.net.{InetSocketAddress} +import java.util.concurrent.TimeUnit import akka.actor._ import akka.pattern.Patterns @@ -28,10 +29,11 @@ import com.google.common.base.Preconditions import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.blob.BlobServer -import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph} +import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, ExecutionGraph} import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged +import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.{JobException, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager @@ -52,7 +54,10 @@ import scala.concurrent.duration._ class JobManager(val configuration: Configuration) extends Actor with ActorLogMessages with ActorLogging with WrapAsScala { import context._ - import AkkaUtils.FUTURE_TIMEOUT + implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) + + Execution.timeout = timeout; log.info("Starting job manager.") @@ -329,7 +334,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { case RequestJobStatus(jobID) => { currentJobs.get(jobID) match { case Some((executionGraph,_)) => sender() ! CurrentJobStatus(jobID, executionGraph.getState) - case None => archive ? RequestJobStatus(jobID) pipeTo sender() + case None => (archive ? RequestJobStatus(jobID))(timeout) pipeTo sender() } } @@ -344,7 +349,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { case RequestJob(jobID) => { currentJobs.get(jobID) match { case Some((eg, _)) => sender() ! JobFound(jobID, eg) - case None => archive ? RequestJob(jobID) pipeTo sender() + case None => (archive ? RequestJob(jobID))(timeout) pipeTo sender() } } @@ -384,6 +389,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { } object JobManager { + import ExecutionMode._ val LOG = LoggerFactory.getLogger(classOf[JobManager]) val FAILURE_RETURN_CODE = 1 val JOB_MANAGER_NAME = "jobmanager" @@ -392,19 +398,34 @@ object JobManager { val PROFILER_NAME = "profiler" def main(args: Array[String]): Unit = { - val (hostname, port, configuration) = parseArgs(args) + val (hostname, port, configuration, executionMode) = parseArgs(args) val jobManagerSystem = AkkaUtils.createActorSystem(hostname, port, configuration) startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem) + + if(executionMode.equals(LOCAL)){ + TaskManager.startActorWithConfiguration(hostname, configuration, true)(jobManagerSystem) + } + jobManagerSystem.awaitTermination() + println("Shutting down.") } - def parseArgs(args: Array[String]): (String, Int, Configuration) = { + def parseArgs(args: Array[String]): (String, Int, Configuration, ExecutionMode) = { val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") { head("flink jobmanager") opt[String]("configDir") action { (x, c) => c.copy(configDir = x) } text ("Specify " + "configuration directory.") + opt[String]("executionMode") optional() action { (x, c) => + if(x.equals("local")){ + c.copy(executionMode = LOCAL) + }else{ + c.copy(executionMode = CLUSTER) + } + } text { + "Specify execution mode of job manager" + } } parser.parse(args, JobManagerCLIConfiguration()) map { @@ -419,7 +440,7 @@ object JobManager { val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - (hostname, port, configuration) + (hostname, port, configuration, config.executionMode) } getOrElse { LOG.error("CLI Parsing failed. Usage: " + parser.usage) sys.exit(FAILURE_RETURN_CODE) @@ -456,19 +477,23 @@ object JobManager { s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}" } - def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = { + def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration): + ActorRef = { AkkaUtils.getChild(jobManager, PROFILER_NAME) } - def getEventCollector(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = { + def getEventCollector(jobManager: ActorRef)(implicit system: ActorSystem, timeout: + FiniteDuration): ActorRef = { AkkaUtils.getChild(jobManager, EVENT_COLLECTOR_NAME) } - def getArchivist(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = { + def getArchivist(jobManager: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration): + ActorRef = { AkkaUtils.getChild(jobManager, ARCHIVE_NAME) } - def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem): ActorRef = { + def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem, timeout: + FiniteDuration): ActorRef = { AkkaUtils.getReference(getAkkaURL(address.getHostName + ":" + address.getPort)) } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala index d588f95..d08aecc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala @@ -18,5 +18,11 @@ package org.apache.flink.runtime.jobmanager -case class JobManagerCLIConfiguration(configDir: String = null) { +object ExecutionMode extends Enumeration{ + type ExecutionMode = Value + val LOCAL = Value + val CLUSTER = Value } + +case class JobManagerCLIConfiguration(configDir: String = null, executionMode: ExecutionMode +.ExecutionMode = ExecutionMode.CLUSTER) {} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala index 81fecbf..715fc0c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala @@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobmanager.web.WebInfoServer trait WithWebServer extends Actor { that: JobManager => - val webServer = new WebInfoServer(configuration,self, archive) + val webServer = new WebInfoServer(configuration,self, archive, timeout) webServer.start() abstract override def postStop(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 35def3d..6d0da27 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -18,6 +18,8 @@ package org.apache.flink.runtime.minicluster +import java.util.concurrent.TimeUnit + import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} import org.apache.flink.api.common.io.FileOutputFormat @@ -27,11 +29,15 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere import org.apache.flink.runtime.util.EnvironmentInformation import org.slf4j.LoggerFactory +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Future, Await} abstract class FlinkMiniCluster(userConfiguration: Configuration) { import FlinkMiniCluster._ + implicit val timeout = FiniteDuration(userConfiguration.getInteger(ConfigConstants + .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) + val configuration = generateConfiguration(userConfiguration) val jobManagerActorSystem = startJobManagerActorSystem() @@ -92,19 +98,18 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) { } def awaitTermination(): Unit = { - taskManagerActorSystems foreach { _.awaitTermination(AkkaUtils.AWAIT_DURATION)} - jobManagerActorSystem.awaitTermination(AkkaUtils.AWAIT_DURATION) + taskManagerActorSystems foreach { _.awaitTermination()} + jobManagerActorSystem.awaitTermination() } def waitForTaskManagersToBeRegistered(): Unit = { - implicit val timeout = AkkaUtils.FUTURE_TIMEOUT implicit val executionContext = AkkaUtils.globalExecutionContext val futures = taskManagerActors map { - _ ? NotifyWhenRegisteredAtJobManager + taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout) } - Await.ready(Future.sequence(futures), AkkaUtils.AWAIT_DURATION) + Await.ready(Future.sequence(futures), timeout) } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 8625983..fb6e36b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -118,40 +118,4 @@ FlinkMiniCluster(userConfiguration){ object LocalFlinkMiniCluster{ val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster]) - val FAILURE_RETURN_CODE = 1 - - def main(args: Array[String]): Unit = { - val configuration = parseArgs(args) - - val cluster = new LocalFlinkMiniCluster(configuration) - - cluster.awaitTermination() - } - - def parseArgs(args: Array[String]): Configuration = { - val parser = new OptionParser[LocalFlinkMiniClusterConfiguration]("LocalFlinkMiniCluster") { - head("LocalFlinkMiniCluster") - opt[String]("configDir") action { (value, config) => config.copy(configDir = value) } text - {"Specify configuration directory."} - } - - parser.parse(args, LocalFlinkMiniClusterConfiguration()) map { - config =>{ - GlobalConfiguration.loadConfiguration(config.configDir) - val configuration = GlobalConfiguration.getConfiguration - - if(config.configDir != null && new File(config.configDir).isDirectory){ - configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..") - } - - configuration - } - } getOrElse{ - LOG.error("CLI parsing failed. Usage: " + parser.usage) - sys.exit(FAILURE_RETURN_CODE) - } - } - - - case class LocalFlinkMiniClusterConfiguration(val configDir: String = "") } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 136dc7f..66d25c5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -69,8 +69,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka extends Actor with ActorLogMessages with ActorLogging with DecorateAsScala with WrapAsScala { import context._ - import AkkaUtils.FUTURE_TIMEOUT - import taskManagerConfig._ + import taskManagerConfig.{timeout => tmTimeout, _} + implicit val timeout = tmTimeout + log.info(s"Starting task manager at ${self.path}.") @@ -172,7 +173,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) } else { log.error("TaskManager could not register at JobManager."); - throw new RuntimeException("TaskManager could not register at JobManager"); + self ! PoisonPill } } @@ -182,6 +183,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka currentJobManager = sender() instanceID = id + context.watch(currentJobManager) + log.info(s"TaskManager successfully registered at JobManager ${ currentJobManager.path .toString @@ -247,7 +250,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka case None => } - val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID) + val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID, timeout) val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager, ioManager, splitProvider,currentJobManager) @@ -356,14 +359,19 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka log.error(s"Cannot find task with ID ${executionID} to unregister.") } } + + case Terminated(jobManager) => { + log.info(s"Job manager ${jobManager.path} is no longer reachable. Try to reregister.") + tryJobManagerRegistration() + } } def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID, executionState: ExecutionState, optionalError: Throwable): Unit = { log.info(s"Update execution state to ${executionState}.") - val futureResponse = currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState - (jobID, executionID, executionState, optionalError)) + val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState + (jobID, executionID, executionState, optionalError)))(timeout) val receiver = this.self @@ -402,7 +410,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka } channelManager = Some(new ChannelManager(currentJobManager, connectionInfo, numBuffers, - bufferSize, connectionManager)) + bufferSize, connectionManager, timeout)) } catch { case ioe: IOException => log.error(ioe, "Failed to instantiate ChannelManager.") @@ -412,7 +420,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka def setupLibraryCacheManager(blobPort: Int): Unit = { if(blobPort > 0){ - val address = new InetSocketAddress(currentJobManager.path.address.host.get, blobPort) + val address = new InetSocketAddress(currentJobManager.path.address.host.getOrElse + ("localhost"), blobPort) libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(address), cleanupInterval) }else{ libraryCacheManager = new FallbackLibraryCacheManager @@ -598,8 +607,11 @@ object TaskManager { .LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 + val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) + val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize, - tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval) + tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout) (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala index 150db73..a6a76a3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala @@ -18,7 +18,10 @@ package org.apache.flink.runtime.taskmanager +import scala.concurrent.duration.FiniteDuration + case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int, tmpDirPaths: Array[String], cleanupInterval: Long, memoryLogggingIntervalMs: Option[Long], - profilingInterval: Option[Long]) + profilingInterval: Option[Long], + timeout: FiniteDuration) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index c6b4fb5..b76944b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -32,6 +32,7 @@ import akka.actor.UntypedActor; import akka.japi.Creator; import akka.pattern.Patterns; import akka.testkit.JavaTestKit; +import akka.util.Timeout; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -73,6 +74,8 @@ public class TaskManagerTest { private static ActorSystem system; + private static Timeout timeout = new Timeout(1, TimeUnit.MINUTES); + @BeforeClass public static void setup(){ system = ActorSystem.create("TestActorSystem", TestingUtils.testConfig()); @@ -178,7 +181,7 @@ public class TaskManagerTest { expectMsgEquals(new TaskOperationResult(eid1, true)); Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), - AkkaUtils.FUTURE_TIMEOUT()); + timeout); Await.ready(response, d); assertEquals(ExecutionState.CANCELED, t1.getExecutionState()); @@ -197,7 +200,7 @@ public class TaskManagerTest { expectMsgEquals(new TaskOperationResult(eid2, true)); response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2), - AkkaUtils.FUTURE_TIMEOUT()); + timeout); Await.ready(response, d); assertEquals(ExecutionState.CANCELED, t2.getExecutionState()); @@ -336,13 +339,13 @@ public class TaskManagerTest { // we get to the check, so we need to guard the check if (t1 != null) { Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), - AkkaUtils.FUTURE_TIMEOUT()); + timeout); Await.ready(response, d); } if (t2 != null) { Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2), - AkkaUtils.FUTURE_TIMEOUT()); + timeout); Await.ready(response, d); assertEquals(ExecutionState.FINISHED, t2.getExecutionState()); } @@ -424,7 +427,7 @@ public class TaskManagerTest { if (t2 != null) { Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2), - AkkaUtils.FUTURE_TIMEOUT()); + timeout); Await.ready(response, d); } @@ -434,7 +437,7 @@ public class TaskManagerTest { expectMsgEquals(new TaskOperationResult(eid1, true)); } Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), - AkkaUtils.FUTURE_TIMEOUT()); + timeout); Await.ready(response, d); } @@ -538,7 +541,7 @@ public class TaskManagerTest { ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system); Future<Object> response = Patterns.ask(taskManager, NotifyWhenRegisteredAtJobManager$.MODULE$, - AkkaUtils.FUTURE_TIMEOUT()); + timeout); try { FiniteDuration d = new FiniteDuration(2, TimeUnit.SECONDS); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index e23992c..6689f93 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -37,6 +37,8 @@ import scala.concurrent.duration._ @RunWith(classOf[JUnitRunner]) class JobManagerITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { + implicit val timeout = 1 minute + def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) override def afterAll: Unit = { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8d0a0aa/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java index 2e08dc3..6347cb5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java @@ -23,9 +23,8 @@ import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.dispatch.ExecutionContexts; import akka.pattern.Patterns; -import com.amazonaws.http.ExecutionContext; +import akka.util.Timeout; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -116,7 +115,7 @@ public abstract class CancellingTestBase { boolean jobSuccessfullyCancelled = false; Future<Object> result = Patterns.ask(client, new JobClientMessages.SubmitJobAndWait - (jobGraph, false), AkkaUtils.FUTURE_TIMEOUT()); + (jobGraph, false), new Timeout(AkkaUtils.DEFAULT_TIMEOUT())); actorSystem.scheduler().scheduleOnce(new FiniteDuration(msecsTillCanceling, TimeUnit.MILLISECONDS), client, new JobManagerMessages.CancelJob(jobGraph.getJobID()), @@ -125,7 +124,7 @@ public abstract class CancellingTestBase { throw new IllegalStateException("Job restarted"); try { - Await.result(result, AkkaUtils.AWAIT_DURATION()); + Await.result(result, AkkaUtils.DEFAULT_TIMEOUT()); } catch (JobExecutionException exception) { if (!exception.isJobCanceledByUser()) { throw new IllegalStateException("Job Failed.");
