Execute lookupConnectionInformation and UpdateTaskExecutionState concurrently within futures
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0516d266 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0516d266 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0516d266 Branch: refs/heads/master Commit: 0516d266d21bcd9d17199880a0d5ebe6d9760fda Parents: c93d9ea Author: Till Rohrmann <[email protected]> Authored: Tue Dec 16 12:44:17 2014 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Dec 18 18:58:32 2014 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 2 +- .../apache/flink/runtime/akka/AkkaUtils.scala | 4 ++-- .../flink/runtime/jobmanager/JobManager.scala | 20 ++++++++++++-------- .../flink/test/util/AbstractTestBase.java | 1 + 4 files changed, 16 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index bbbe6d4..2d97504 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -580,7 +580,7 @@ public final class ConfigConstants { // ------------------------------ Akka Values ------------------------------ - public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "10 s"; + public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s"; public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s"; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 229c229..97ce343 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -206,9 +206,9 @@ object AkkaUtils { | | loggers = ["akka.event.slf4j.Slf4jLogger"] | logger-startup-timeout = 30s - | loglevel = "DEBUG" + | loglevel = "WARNING" | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" - | stdout-loglevel = "DEBUG" + | stdout-loglevel = "WARNING" | jvm-exit-on-fatal-error = off | log-config-on-start = off | serialize-messages = on http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 8752bef..17f123a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -133,10 +133,12 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { // Create the user code class loader libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys) + val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) + val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID(), (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, - jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys), JobInfo(sender(), - System.currentTimeMillis()))) + jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys, userCodeLoader), + JobInfo(sender(), System.currentTimeMillis()))) val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){ jobGraph.getNumberOfExecutionRetries @@ -147,8 +149,6 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { executionGraph.setNumberOfRetriesLeft(jobNumberRetries) executionGraph.setDelayBeforeRetrying(delayBetweenRetries) - val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) - if (userCodeLoader == null) { throw new JobException("The user code class loader could not be initialized.") } @@ -253,7 +253,9 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { currentJobs.get(taskExecutionState.getJobID) match { case Some((executionGraph, _)) => val originalSender = sender() - originalSender ! executionGraph.updateState(taskExecutionState) + Future { + originalSender ! executionGraph.updateState(taskExecutionState) + } case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState .getJobID} to change state to ${taskExecutionState.getExecutionState}.") sender() ! false @@ -350,9 +352,11 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { currentJobs.get(jobID) match { case Some((executionGraph, _)) => val originalSender = sender() - originalSender ! ConnectionInformation( - executionGraph.lookupConnectionInfoAndDeployReceivers - (connectionInformation, sourceChannelID)) + Future { + originalSender ! ConnectionInformation( + executionGraph.lookupConnectionInfoAndDeployReceivers + (connectionInformation, sourceChannelID)) + } case None => log.error(s"Cannot find execution graph for job ID ${jobID}.") sender() ! ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound()) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index b24b3ee..b4da64d 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration; import com.google.common.base.Charsets; import com.google.common.io.Files; +import org.junit.Assert; import scala.concurrent.duration.FiniteDuration; public abstract class AbstractTestBase extends TestBaseUtils {
