Removed JobStatusListener and ExecutionListener. Fixed LocalExecutor output for maven verify.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8eadd3ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8eadd3ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8eadd3ec Branch: refs/heads/master Commit: 8eadd3ec2e07aa5eec6daccc6849b1038bbeae5f Parents: c175ebe Author: Till Rohrmann <[email protected]> Authored: Mon Nov 17 17:06:13 2014 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Dec 18 18:58:31 2014 +0100 ---------------------------------------------------------------------- .../flink/streaming/util/ClusterUtil.java | 1 - .../main/java/org/apache/flink/yarn/Client.java | 9 +- .../apache/flink/yarn/ApplicationClient.scala | 4 +- .../apache/flink/yarn/ApplicationMaster.scala | 2 +- .../org/apache/flink/client/LocalExecutor.java | 8 +- .../src/main/flink-bin/bin/start-cluster.sh | 2 +- .../src/main/flink-bin/bin/start-local.sh | 2 +- .../src/main/flink-bin/bin/stop-cluster.sh | 2 +- flink-dist/src/main/flink-bin/bin/stop-local.sh | 2 +- .../web-docs-infoserver/js/jquery-2.1.0.js | 9111 ------------------ .../apache/flink/runtime/blob/BlobClient.java | 6 +- .../apache/flink/runtime/blob/BlobServer.java | 18 +- .../runtime/execution/ExecutionListener.java | 33 - .../runtime/executiongraph/ExecutionGraph.java | 40 - .../executiongraph/JobStatusListener.java | 36 - .../runtime/profiling/TaskManagerProfiler.java | 13 +- .../apache/flink/runtime/taskmanager/Task.java | 31 +- .../web-docs-infoserver/js/jquery-1.10.2.js | 6 - .../web-docs-infoserver/js/jquery-2.1.0.js | 9111 ++++++++++++++++++ .../flink/runtime/taskmanager/TaskManager.scala | 6 +- .../clients/examples/LocalExecutorITCase.java | 1 + ...terationNotDependingOnSolutionSetITCase.java | 44 +- 22 files changed, 9183 insertions(+), 9305 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java index a7b7137..f75db68 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.util; import java.net.InetSocketAddress; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java index b7c8e51..1de61a8 100644 --- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java +++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.jar.JarFile; import akka.actor.ActorRef; @@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; +import scala.concurrent.duration.FiniteDuration; /** * All classes in this package contain code taken from @@ -328,6 +330,9 @@ public class Client { LOG.warn("Unable to find job manager port in configuration!"); jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT; } + FiniteDuration timeout = new FiniteDuration(GlobalConfiguration.getInteger + (ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), + TimeUnit.SECONDS); conf = Utils.initializeYarnConfiguration(); @@ -520,7 +525,6 @@ public class Client { // file that we write into the conf/ dir containing the jobManager address and the dop. yarnPropertiesFile = new File(confDirPath + CliFrontend.YARN_PROPERTIES_FILE); - LOG.info("Submitting application master " + appId); yarnClient.submitApplication(appContext); @@ -533,7 +537,8 @@ public class Client { // start application client LOG.info("Start application client."); applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, appId, jmPort, - yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded)); + yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded, + timeout)); actorSystem.awaitTermination(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index d80f133..58ce6cf 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -35,7 +35,7 @@ import scala.concurrent.duration._ class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient, confDirPath: String, slots: Int, numTaskManagers: Int, - dynamicPropertiesEncoded: String) + dynamicPropertiesEncoded: String, timeout: FiniteDuration) extends Actor with Consumer with ActorLogMessages with ActorLogging { import context._ @@ -85,7 +85,7 @@ class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient, writeYarnProperties(address) - jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address))) + jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address))(system, timeout)) jobManager.get ! RegisterMessageListener pollingTimer foreach { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala index 628db6d..64db0ad 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -159,7 +159,7 @@ object ApplicationMaster{ val args = Array[String]("--configDir", pathToConfig) LOG.info(s"Config path: ${pathToConfig}.") - val (hostname, port, configuration) = JobManager.parseArgs(args) + val (hostname, port, configuration, _) = JobManager.parseArgs(args) implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port, configuration) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index bc021c1..55fda89 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -60,6 +60,8 @@ public class LocalExecutor extends PlanExecutor { private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS; private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE; + + private boolean printStatusDuringExecution = true; // -------------------------------------------------------------------------------------------- @@ -82,6 +84,10 @@ public class LocalExecutor extends PlanExecutor { public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; } public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; } + + public void setPrintStatusDuringExecution(boolean printStatus) { + this.printStatusDuringExecution = printStatus; + } // -------------------------------------------------------------------------------------------- @@ -164,7 +170,7 @@ public class LocalExecutor extends PlanExecutor { ActorRef jobClient = flink.getJobClient(); - return JobClient.submitJobAndWait(jobGraph, true, jobClient); + return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, jobClient); } finally { if (shutDownAtEnd) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/start-cluster.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh index fe97899..c447edb 100755 --- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh +++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh @@ -35,7 +35,7 @@ if [ ! -f "$HOSTLIST" ]; then fi # cluster mode, bring up job manager locally and a task manager on every slave host -"$FLINK_BIN_DIR"/jobManager.sh start cluster +"$FLINK_BIN_DIR"/jobmanager.sh start cluster GOON=true while $GOON http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/start-local.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/start-local.sh b/flink-dist/src/main/flink-bin/bin/start-local.sh index 83f621a..f382763 100755 --- a/flink-dist/src/main/flink-bin/bin/start-local.sh +++ b/flink-dist/src/main/flink-bin/bin/start-local.sh @@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd` . "$bin"/config.sh # local mode, only bring up job manager. The job manager will start an internal task manager -"$FLINK_BIN_DIR"/jobManager.sh start local +"$FLINK_BIN_DIR"/jobmanager.sh start local http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/stop-cluster.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh index 5fba480..d9fe6f6 100755 --- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh +++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh @@ -46,4 +46,4 @@ do done < $HOSTLIST # cluster mode, stop the job manager locally and stop the task manager on every slave host -"$FLINK_BIN_DIR"/jobManager.sh stop +"$FLINK_BIN_DIR"/jobmanager.sh stop http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/stop-local.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/stop-local.sh b/flink-dist/src/main/flink-bin/bin/stop-local.sh index c576723..79627fa 100755 --- a/flink-dist/src/main/flink-bin/bin/stop-local.sh +++ b/flink-dist/src/main/flink-bin/bin/stop-local.sh @@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd` . "$bin"/config.sh # stop local job manager (has an internal task manager) -"$FLINK_BIN_DIR"/jobManager.sh stop +"$FLINK_BIN_DIR"/jobmanager.sh stop
