Execute lookupConnectionInformation and UpdateTaskExecutionState concurrently 
within futures


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0516d266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0516d266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0516d266

Branch: refs/heads/master
Commit: 0516d266d21bcd9d17199880a0d5ebe6d9760fda
Parents: c93d9ea
Author: Till Rohrmann <[email protected]>
Authored: Tue Dec 16 12:44:17 2014 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Dec 18 18:58:32 2014 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  2 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  4 ++--
 .../flink/runtime/jobmanager/JobManager.scala   | 20 ++++++++++++--------
 .../flink/test/util/AbstractTestBase.java       |  1 +
 4 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index bbbe6d4..2d97504 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -580,7 +580,7 @@ public final class ConfigConstants {
        
        // ------------------------------ Akka Values 
------------------------------
 
-       public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "10 s";
+       public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
 
        public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 
s";
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 229c229..97ce343 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -206,9 +206,9 @@ object AkkaUtils {
       |
       |  loggers = ["akka.event.slf4j.Slf4jLogger"]
       |  logger-startup-timeout = 30s
-      |  loglevel = "DEBUG"
+      |  loglevel = "WARNING"
       |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-      |  stdout-loglevel = "DEBUG"
+      |  stdout-loglevel = "WARNING"
       |  jvm-exit-on-fatal-error = off
       |  log-config-on-start = off
       |  serialize-messages = on

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8752bef..17f123a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -133,10 +133,12 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
           // Create the user code class loader
           libraryCacheManager.registerJob(jobGraph.getJobID, 
jobGraph.getUserJarBlobKeys)
 
+          val userCodeLoader = 
libraryCacheManager.getClassLoader(jobGraph.getJobID)
+
           val (executionGraph, jobInfo) = 
currentJobs.getOrElseUpdate(jobGraph.getJobID(),
             (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
-              jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys), 
JobInfo(sender(),
-              System.currentTimeMillis())))
+              jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys, 
userCodeLoader),
+              JobInfo(sender(), System.currentTimeMillis())))
 
           val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){
             jobGraph.getNumberOfExecutionRetries
@@ -147,8 +149,6 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
           executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
           executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
 
-          val userCodeLoader = 
libraryCacheManager.getClassLoader(jobGraph.getJobID)
-
           if (userCodeLoader == null) {
             throw new JobException("The user code class loader could not be 
initialized.")
           }
@@ -253,7 +253,9 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
         currentJobs.get(taskExecutionState.getJobID) match {
           case Some((executionGraph, _)) =>
             val originalSender = sender()
-            originalSender ! executionGraph.updateState(taskExecutionState)
+            Future {
+              originalSender ! executionGraph.updateState(taskExecutionState)
+            }
           case None => log.error(s"Cannot find execution graph for ID 
${taskExecutionState
             .getJobID} to change state to 
${taskExecutionState.getExecutionState}.")
             sender() ! false
@@ -350,9 +352,11 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
       currentJobs.get(jobID) match {
         case Some((executionGraph, _)) =>
           val originalSender = sender()
-          originalSender ! ConnectionInformation(
-            executionGraph.lookupConnectionInfoAndDeployReceivers
-              (connectionInformation, sourceChannelID))
+          Future {
+            originalSender ! ConnectionInformation(
+              executionGraph.lookupConnectionInfoAndDeployReceivers
+                (connectionInformation, sourceChannelID))
+          }
         case None =>
           log.error(s"Cannot find execution graph for job ID ${jobID}.")
           sender() ! 
ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound())

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index b24b3ee..b4da64d 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
+import org.junit.Assert;
 import scala.concurrent.duration.FiniteDuration;
 
 public abstract class AbstractTestBase extends TestBaseUtils {

Reply via email to