[FLINK-1415] [runtime] Akka cleanups

Replace akka.jobmanager.url by non exposed mechanism. Add heuristics to 
calculate different timeouts based on a single value.

Harmonize scala coding style: Remove redundant braces and parentheses, remove 
meaningless code statements, standardize access patterns, name boolean 
parameters, unnecessary semicolons, unnecessary braces in import section

Adds death watch test cases: Test if JobManager detects failing TaskManager. 
Test if the TaskManager detects failing JobManager and tries to reconnect to 
the JobManager.

Refactors notifyExecutionStateChange method to avoid access of the TaskManagers 
internal state from outside

This closes #319.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4046819b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4046819b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4046819b

Branch: refs/heads/master
Commit: 4046819b380b8dfa57d52c6d314f389546a159a3
Parents: 6372063
Author: Till Rohrmann <[email protected]>
Authored: Tue Jan 6 11:15:30 2015 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Feb 5 14:47:12 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |   8 +-
 .../org/apache/flink/client/program/Client.java |   8 +-
 .../flink/client/web/JobsInfoServlet.java       |   6 +-
 .../apache/flink/client/program/ClientTest.java |   3 +-
 .../flink/configuration/ConfigConstants.java    |  15 +-
 .../examples/scala/clustering/KMeans.scala      |   6 +-
 .../scala/graph/ConnectedComponents.scala       |   6 +-
 .../scala/graph/EnumTrianglesBasic.scala        |   6 +-
 .../examples/scala/graph/EnumTrianglesOpt.scala |   6 +-
 .../examples/scala/graph/PageRankBasic.scala    |   6 +-
 .../examples/scala/ml/LinearRegression.scala    |   6 +-
 .../examples/scala/wordcount/WordCount.scala    |   3 +-
 .../flink/runtime/executiongraph/Execution.java |   1 -
 .../scheduler/NoResourceAvailableException.java |   2 +-
 .../jobmanager/web/LogfileInfoServlet.java      |   6 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  28 +-
 .../apache/flink/runtime/ActorLogMessages.scala |   7 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 325 +++++++++-----
 .../apache/flink/runtime/client/JobClient.scala | 153 +++++--
 .../flink/runtime/jobmanager/JobInfo.scala      |  13 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 445 +++++++++++--------
 .../runtime/jobmanager/JobManagerProfiler.scala |  12 +-
 .../runtime/jobmanager/MemoryArchivist.scala    |  47 +-
 .../runtime/jobmanager/WithWebServer.scala      |   3 +
 .../runtime/messages/ArchiveMessages.scala      |   4 +-
 .../messages/ExecutionGraphMessages.scala       |   6 +-
 .../runtime/messages/JobmanagerMessages.scala   |  20 +-
 .../runtime/messages/TaskManagerMessages.scala  |  10 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  55 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     |  25 +-
 .../taskmanager/MemoryUsageLogging.scala        |  21 -
 .../flink/runtime/taskmanager/TaskManager.scala | 324 ++++++++------
 .../TaskManagerCLIConfiguration.scala           |   6 +
 .../taskmanager/TaskManagerProfiler.scala       |  37 +-
 .../executiongraph/AllVerticesIteratorTest.java |   8 +-
 .../ExecutionGraphConstructionTest.java         |  17 +-
 .../ExecutionGraphDeploymentTest.java           |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   4 +-
 .../ExecutionStateProgressTest.java             |   2 +-
 .../ExecutionVertexCancelTest.java              |  26 +-
 .../ExecutionVertexDeploymentTest.java          |  14 +-
 .../ExecutionVertexSchedulingTest.java          |   6 +-
 .../executiongraph/PointwisePatternTest.java    |  14 +-
 .../executiongraph/VertexSlotSharingTest.java   |   2 +-
 .../runtime/instance/AllocatedSlotTest.java     | 141 ------
 .../flink/runtime/instance/SimpleSlotTest.java  | 141 ++++++
 .../runtime/taskmanager/TaskManagerTest.java    |   6 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  53 ++-
 .../ExecutionGraphRestartTest.scala             |   6 +-
 .../TaskManagerLossFailsTasksTest.scala         |   2 +-
 .../jobmanager/CoLocationConstraintITCase.scala |   2 +-
 .../jobmanager/JobManagerFailsITCase.scala      |  72 +++
 .../runtime/jobmanager/JobManagerITCase.scala   |   4 +-
 .../runtime/jobmanager/RecoveryITCase.scala     |   2 +-
 .../runtime/jobmanager/SlotSharingITCase.scala  |   2 +-
 .../jobmanager/TaskManagerFailsITCase.scala     |  37 +-
 .../TaskManagerRegistrationITCase.scala         |   6 +-
 .../apache/flink/runtime/jobmanager/Tasks.scala |   4 +-
 .../runtime/testingUtils/TestingCluster.scala   |  20 +-
 .../testingUtils/TestingJobManager.scala        |  22 +-
 .../TestingJobManagerMessages.scala             |   3 +
 .../testingUtils/TestingMemoryArchivist.scala   |   3 +
 .../testingUtils/TestingTaskManager.scala       |  35 +-
 .../TestingTaskManagerMessages.scala            |   9 +-
 .../runtime/testingUtils/TestingUtils.scala     |  88 ++--
 .../apache/flink/api/scala/ClosureCleaner.scala |   2 +-
 .../apache/flink/api/scala/CoGroupDataSet.scala |   3 +-
 .../flink/api/scala/codegen/TreeGen.scala       |   2 +-
 .../api/scala/codegen/TypeInformationGen.scala  |   3 +-
 .../scala/typeutils/CaseClassSerializer.scala   |   3 +-
 .../socket/SocketTextStreamWordCount.scala      |   2 +
 .../examples/windowing/TopSpeedWindowing.scala  |   5 +-
 .../scala/examples/windowing/WindowJoin.scala   |   1 +
 .../api/scala/StreamJoinOperator.scala          |   2 +-
 .../flink/streaming/api/scala/package.scala     |   2 +
 .../java/org/apache/flink/tachyon/HDFSTest.java |   3 -
 .../flink/test/util/AbstractTestBase.java       |   8 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   4 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   2 +-
 .../test/cancelling/CancellingTestBase.java     |   4 +-
 .../scala/functions/ClosureCleanerITCase.scala  |  18 +-
 .../api/scala/operators/AggregateITCase.scala   |   6 +-
 .../api/scala/operators/CoGroupITCase.scala     |  28 +-
 .../flink/api/scala/operators/CrossITCase.scala |  20 +-
 .../api/scala/operators/DistinctITCase.scala    |  18 +-
 .../api/scala/operators/ExamplesITCase.scala    |  10 +-
 .../api/scala/operators/FilterITCase.scala      |  16 +-
 .../api/scala/operators/FirstNITCase.scala      |   8 +-
 .../api/scala/operators/FlatMapITCase.scala     |  16 +-
 .../api/scala/operators/GroupReduceITCase.scala |  73 +--
 .../flink/api/scala/operators/JoinITCase.scala  |  42 +-
 .../flink/api/scala/operators/MapITCase.scala   |  20 +-
 .../api/scala/operators/PartitionITCase.scala   |  16 +-
 .../api/scala/operators/ReduceITCase.scala      |  22 +-
 .../api/scala/operators/SumMinMaxITCase.scala   |   8 +-
 .../flink/api/scala/operators/UnionITCase.scala |   8 +-
 .../tuple/base/PairComparatorTestBase.scala     |   6 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   1 -
 .../org/apache/flink/yarn/FlinkYarnCluster.java |   5 +-
 .../yarn/appMaster/YarnTaskManagerRunner.java   |   5 +-
 .../apache/flink/yarn/ApplicationClient.scala   |  38 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |   9 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |  29 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   3 +-
 .../scala/org/apache/flink/yarn/YarnUtils.scala |  48 +-
 .../java/org/apache/flink/yarn/UtilsTests.java  |   1 -
 106 files changed, 1666 insertions(+), 1238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index dfd2f70..3455fcd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -33,7 +33,6 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -796,8 +795,8 @@ public class CliFrontend {
                }
 
                return 
JobManager.getJobManager(RemoteExecutor.getInetFromHostport(jobManagerAddressStr),
-                               ActorSystem.create("CliFrontendActorSystem", 
AkkaUtils
-                                               
.getDefaultActorSystemConfig()),getAkkaTimeout());
+                               ActorSystem.create("CliFrontendActorSystem",
+                                               
AkkaUtils.getDefaultAkkaConfig()),getAkkaTimeout());
        }
        
 
@@ -867,8 +866,7 @@ public class CliFrontend {
        protected FiniteDuration getAkkaTimeout(){
                Configuration config = getGlobalConfiguration();
 
-               return new 
FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
-                               ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), 
TimeUnit.SECONDS);
+               return AkkaUtils.getTimeout(config);
        }
        
        public static List<Tuple2<String, String>> getDynamicProperties(String 
dynamicPropertiesEncoded) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index f1444ff..9b95d41 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -24,8 +24,8 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -305,14 +305,14 @@ public class Client {
        }
 
        public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws 
ProgramInvocationException {
-               Tuple2<ActorSystem, ActorRef> pair = 
JobClient.startActorSystemAndActor(configuration);
+               Tuple2<ActorSystem, ActorRef> pair = 
JobClient.startActorSystemAndActor(configuration,
+                               false);
 
                ActorRef client = pair._2();
 
                String hostname = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
 
-               FiniteDuration timeout = new 
FiniteDuration(configuration.getInteger(ConfigConstants
-                               .AKKA_ASK_TIMEOUT, 
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
+               FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
 
                if(hostname == null){
                        throw new ProgramInvocationException("Could not find 
hostname of job manager.");

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java 
b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
index e339ec7..f83e9b4 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -59,9 +58,8 @@ public class JobsInfoServlet extends HttpServlet {
        public JobsInfoServlet(Configuration flinkConfig) {
                this.config = flinkConfig;
                system = ActorSystem.create("JobsInfoServletActorSystem",
-                               AkkaUtils.getDefaultActorSystemConfig());
-               this.timeout = new 
FiniteDuration(flinkConfig.getInteger(ConfigConstants
-                               .AKKA_ASK_TIMEOUT, 
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
+                               AkkaUtils.getDefaultAkkaConfig());
+               this.timeout = AkkaUtils.getTimeout(flinkConfig);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index ea7780c..ba520c3 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -84,6 +84,7 @@ public class ClientTest {
 
                
when(configMock.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
null)).thenReturn("localhost");
                
when(configMock.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)).thenReturn(6123);
+               when(configMock.getString(ConfigConstants.AKKA_ASK_TIMEOUT, 
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).thenReturn(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
 
                when(planMock.getJobName()).thenReturn("MockPlan");
 //             when(mockJarFile.getAbsolutePath()).thenReturn("mockFilePath");
@@ -99,7 +100,7 @@ public class ClientTest {
 
                Whitebox.setInternalState(JobClient$.class, mockJobClient);
 
-               
when(mockJobClient.startActorSystemAndActor(configMock)).thenReturn(new 
Tuple2<ActorSystem,
+               when(mockJobClient.startActorSystemAndActor(configMock, 
false)).thenReturn(new Tuple2<ActorSystem,
                                ActorRef>(mockSystem, mockJobClientActor));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a2f2c83..87a3d5f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -65,11 +65,6 @@ public final class ConfigConstants {
         * for communication with the job manager.
         */
        public static final String JOB_MANAGER_IPC_PORT_KEY = 
"jobmanager.rpc.port";
-
-       /**
-        * The config parameter defining the akka url of the job manager
-        */
-       public static final String JOB_MANAGER_AKKA_URL = "jobmanager.akka.url";
        
        /**
         * The config parameter defining the number of handler threads for the 
jobmanager RPC service.
@@ -594,29 +589,21 @@ public final class ConfigConstants {
        
        // ------------------------------ Akka Values 
------------------------------
 
-       public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
-
        public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 
s";
 
        public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";
 
        public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;
 
-       public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms";
-
-       public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 m";
-
        public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
 
-       public static String DEFAULT_AKKA_TCP_TIMEOUT = "100 s";
-
        public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15;
 
        public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false;
 
        public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
 
-       public static int DEFAULT_AKKA_ASK_TIMEOUT = 100;
+       public static String DEFAULT_AKKA_ASK_TIMEOUT = "100 s";
        
 
        // ----------------------------- LocalExecution 
----------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
index 11430e9..26d01c3 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
@@ -112,10 +112,13 @@ object KMeans {
         centersPath = programArguments(1)
         outputPath = programArguments(2)
         numIterations = Integer.parseInt(programArguments(3))
+
+        true
       }
       else {
         System.err.println("Usage: KMeans <points path> <centers path> <result 
path> <num " +
           "iterations>")
+
         false
       }
     }
@@ -128,8 +131,9 @@ object KMeans {
         "program.")
       System.out.println("  Usage: KMeans <points path> <centers path> <result 
path> <num " +
         "iterations>")
+
+      true
     }
-    true
   }
 
   private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
index 2bb6916..e75c862 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
@@ -113,9 +113,12 @@ object ConnectedComponents {
         edgesPath = args(1)
         outputPath = args(2)
         maxIterations = args(3).toInt
+
+        true
       } else {
         System.err.println("Usage: ConnectedComponents <vertices path> <edges 
path> <result path>" +
           " <max number of iterations>")
+
         false
       }
     } else {
@@ -123,8 +126,9 @@ object ConnectedComponents {
       System.out.println("  Provide parameters to read input data from a 
file.")
       System.out.println("  Usage: ConnectedComponents <vertices path> <edges 
path> <result path>" +
         " <max number of iterations>")
+
+      true
     }
-    true
   }
 
   private def getVerticesDataSet(env: ExecutionEnvironment): DataSet[Long] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
index 2623c0c..a62786c 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
@@ -148,8 +148,11 @@ object EnumTrianglesBasic {
       if (args.length == 2) {
         edgePath = args(0)
         outputPath = args(1)
+
+        true
       } else {
         System.err.println("Usage: EnumTriangleBasic <edge path> <result 
path>")
+
         false
       }
     } else {
@@ -157,8 +160,9 @@ object EnumTrianglesBasic {
       System.out.println("  Provide parameters to read input data from files.")
       System.out.println("  See the documentation for the correct format of 
input files.")
       System.out.println("  Usage: EnumTriangleBasic <edge path> <result 
path>")
+
+      true
     }
-    true
   }
 
   private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
index 64aeb77..244e968 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
@@ -215,8 +215,11 @@ object EnumTrianglesOpt {
       if (args.length == 2) {
         edgePath = args(0)
         outputPath = args(1)
+
+        true
       } else {
         System.err.println("Usage: EnumTriangleOpt <edge path> <result path>")
+
         false
       }
     } else {
@@ -224,8 +227,9 @@ object EnumTrianglesOpt {
       System.out.println("  Provide parameters to read input data from files.")
       System.out.println("  See the documentation for the correct format of 
input files.")
       System.out.println("  Usage: EnumTriangleBasic <edge path> <result 
path>")
+
+      true
     }
-    true
   }
 
   private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index dc42ed1..a3ea4b3 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -159,9 +159,12 @@ object PageRankBasic {
         outputPath = args(2)
         numPages = args(3).toLong
         maxIterations = args(4).toInt
+
+        true
       } else {
         System.err.println("Usage: PageRankBasic <pages path> <links path> 
<output path> <num " +
           "pages> <num iterations>")
+
         false
       }
     } else {
@@ -173,8 +176,9 @@ object PageRankBasic {
         "pages> <num iterations>")
 
       numPages = PageRankData.getNumberOfPages
+
+      true
     }
-    true
   }
 
   private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
index 508d677..8efb63b 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
@@ -150,9 +150,12 @@ object LinearRegression {
         dataPath = programArguments(0)
         outputPath = programArguments(1)
         numIterations = programArguments(2).toInt
+
+        true
       }
       else {
         System.err.println("Usage: LinearRegression <data path> <result path> 
<num iterations>")
+
         false
       }
     }
@@ -164,8 +167,9 @@ object LinearRegression {
       System.out.println("  We provide a data generator to create synthetic 
input files for this " +
         "program.")
       System.out.println("  Usage: LinearRegression <data path> <result path> 
<num iterations>")
+
+      true
     }
-    true
   }
 
   private def getDataSet(env: ExecutionEnvironment): DataSet[Data] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
index 49e1a48..b5c2ee2 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
@@ -70,6 +70,7 @@ object WordCount {
       if (args.length == 2) {
         textPath = args(0)
         outputPath = args(1)
+        true
       } else {
         System.err.println("Usage: WordCount <text path> <result path>")
         false
@@ -78,8 +79,8 @@ object WordCount {
       System.out.println("Executing WordCount example with built-in default 
data.")
       System.out.println("  Provide parameters to read input data from a 
file.")
       System.out.println("  Usage: WordCount <text path> <result path>")
+      true
     }
-    true
   }
 
   private def getTextDataSet(env: ExecutionEnvironment): DataSet[String] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 27977c3..8c5c673 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -22,7 +22,6 @@ import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.PartitionInfo;

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index 2b86c43..ebf37d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -35,7 +35,7 @@ public class NoResourceAvailableException extends 
JobException {
                super("No resource available to schedule unit " + unit
                                + ". You can decrease the operator parallelism 
or increase the number of slots per TaskManager in the configuration.");
        }
-       
+
        public NoResourceAvailableException(int numInstances, int 
numSlotsTotal, int availableSlots) {
                super(String.format("%s Resources available to scheduler: 
Number of instances=%d, total number of slots=%d, available slots=%d",
                                BASE_MESSAGE, numInstances, numSlotsTotal, 
availableSlots));

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
index 05f3ed9..23f1604 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
@@ -33,8 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.flink.util.StringUtils;
 
-import com.google.common.base.Preconditions;
-
 public class LogfileInfoServlet extends HttpServlet {
 
        private static final long serialVersionUID = 1L;
@@ -48,7 +46,9 @@ public class LogfileInfoServlet extends HttpServlet {
 
 
        public LogfileInfoServlet(File[] logDirs) {
-               Preconditions.checkNotNull(logDirs, "The given log files are 
null.");
+               if(logDirs == null){
+                       throw new NullPointerException("The given log files are 
null.");
+               }
                this.logDirs = logDirs;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 5b6259a..715515e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
@@ -39,7 +40,7 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-public final class Task {
+public class Task {
 
        /** For atomic state updates */
        private static final AtomicReferenceFieldUpdater<Task, ExecutionState> 
STATE_UPDATER =
@@ -62,7 +63,7 @@ public final class Task {
 
        private final String taskName;
 
-       private final TaskManager taskManager;
+       private final ActorRef taskManager;
 
        private final List<ActorRef> executionListenerActors = new 
CopyOnWriteArrayList<ActorRef>();
 
@@ -75,7 +76,7 @@ public final class Task {
        // 
--------------------------------------------------------------------------------------------
 
 
        public Task(JobID jobId, JobVertexID vertexId, int taskIndex, int 
parallelism,
-                               ExecutionAttemptID executionId, String 
taskName, TaskManager taskManager) {
+                       ExecutionAttemptID executionId, String taskName, 
ActorRef taskManager) {
 
                this.jobId = jobId;
                this.vertexId = vertexId;
@@ -164,7 +165,7 @@ public final class Task {
        public boolean markAsFinished() {
                if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, 
ExecutionState.FINISHED)) {
                        notifyObservers(ExecutionState.FINISHED, null);
-                       taskManager.notifyExecutionStateChange(jobId, 
executionId, ExecutionState.FINISHED, null);
+                       notifyExecutionStateChange(ExecutionState.FINISHED, 
null);
                        return true;
                }
                else {
@@ -186,7 +187,7 @@ public final class Task {
                        // message back
                        else if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.FAILED)) {
                                notifyObservers(ExecutionState.FAILED, 
ExceptionUtils.stringifyException(error));
-                               taskManager.notifyExecutionStateChange(jobId, 
executionId, ExecutionState.FAILED, error);
+                               
notifyExecutionStateChange(ExecutionState.FAILED, error);
                                return;
                        }
                }
@@ -208,7 +209,7 @@ public final class Task {
                                if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.CANCELED)) {
 
                                        
notifyObservers(ExecutionState.CANCELED, null);
-                                       
taskManager.notifyExecutionStateChange(jobId, executionId, 
ExecutionState.CANCELED, null);
+                                       
notifyExecutionStateChange(ExecutionState.CANCELED, null);
                                        return;
                                }
                        }
@@ -260,7 +261,7 @@ public final class Task {
                                if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.FAILED)) {
 
                                        notifyObservers(ExecutionState.FAILED, 
null);
-                                       
taskManager.notifyExecutionStateChange(jobId, executionId, 
ExecutionState.FAILED, cause);
+                                       
notifyExecutionStateChange(ExecutionState.FAILED, cause);
                                        return;
                                }
                        }
@@ -275,7 +276,7 @@ public final class Task {
                                        }
 
                                        notifyObservers(ExecutionState.FAILED, 
null);
-                                       
taskManager.notifyExecutionStateChange(jobId, executionId, 
ExecutionState.FAILED, cause);
+                                       
notifyExecutionStateChange(ExecutionState.FAILED, cause);
 
                                        return;
                                }
@@ -299,7 +300,7 @@ public final class Task {
 
                        if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.CANCELED)) {
                                notifyObservers(ExecutionState.CANCELED, null);
-                               taskManager.notifyExecutionStateChange(jobId, 
executionId, ExecutionState.CANCELED, null);
+                               
notifyExecutionStateChange(ExecutionState.CANCELED, null);
                                return;
                        }
                }
@@ -329,6 +330,15 @@ public final class Task {
                }
        }
 
+       protected void notifyExecutionStateChange(ExecutionState executionState,
+                                                                               
        Throwable optionalError) {
+               LOG.info("Update execution state to " + executionState);
+               taskManager.tell(new 
JobManagerMessages.UpdateTaskExecutionState(
+                               new TaskExecutionState(jobId, executionId, 
executionState, optionalError)),
+                               ActorRef.noSender());
+
+       }
+
        // 
-----------------------------------------------------------------------------------------------------------------
        //                                        Task Profiling
        // 
-----------------------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
index c8ec180..14f0ab0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
@@ -21,6 +21,9 @@ package org.apache.flink.runtime
 import _root_.akka.actor.Actor
 import _root_.akka.event.LoggingAdapter
 
+/**
+ * Mixin to add debug message logging
+ */
 trait ActorLogMessages {
   self: Actor =>
 
@@ -34,14 +37,14 @@ trait ActorLogMessages {
         _receiveWithLogMessages(x)
       }
       else {
-        log.debug(s"Received message ${x} from ${self.sender}.")
+        log.debug(s"Received message $x from ${self.sender}.")
         
         val start = System.nanoTime()
         
         _receiveWithLogMessages(x)
         
         val duration = (System.nanoTime() - start) / 1000000
-        log.debug(s"Handled message ${x} in ${duration} ms from 
${self.sender}.")
+        log.debug(s"Handled message $x in $duration ms from ${self.sender}.")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index e6e865e..afbb6dd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -28,40 +28,154 @@ import org.apache.flink.configuration.{ConfigConstants, 
Configuration}
 import org.slf4j.LoggerFactory
 import scala.concurrent.{ExecutionContext, Future, Await}
 import scala.concurrent.duration._
+import scala.language.postfixOps
 
+/**
+ * This class contains utility functions for akka. It contains methods to 
start an actor system with
+ * a given akka configuration. Furthermore, the akka configuration used for 
starting the different
+ * actor systems resides in this class.
+ */
 object AkkaUtils {
   val LOG = LoggerFactory.getLogger(AkkaUtils.getClass)
 
-  val DEFAULT_TIMEOUT: FiniteDuration =
-    FiniteDuration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS)
-
   val INF_TIMEOUT = 21474835 seconds
 
   var globalExecutionContext: ExecutionContext = ExecutionContext.global
 
-  def createActorSystem(host: String, port: Int, configuration: 
Configuration): ActorSystem = {
-    val akkaConfig = ConfigFactory.parseString(AkkaUtils.getConfigString(host, 
port, configuration))
+  /**
+   * Creates an actor system. If a listening address is specified, then the 
actor system will listen
+   * on that address for messages from a remote actor system. If not, then a 
local actor system
+   * will be instantiated.
+   *
+   * @param configuration instance containing the user provided configuration 
values
+   * @param listeningAddress an optional tuple containing a hostname and a 
port to bind to. If the
+   *                         parameter is None, then a local actor system will 
be created.
+   * @return created actor system
+   */
+  def createActorSystem(configuration: Configuration,
+                        listeningAddress: Option[(String, Int)]): ActorSystem 
= {
+    val akkaConfig = getAkkaConfig(configuration, listeningAddress)
     createActorSystem(akkaConfig)
   }
 
-  def createActorSystem(): ActorSystem = {
-    createActorSystem(getDefaultActorSystemConfig)
+  /**
+   * Creates an actor system with the given akka config.
+   *
+   * @param akkaConfig configuration for the actor system
+   * @return created actor system
+   */
+  def createActorSystem(akkaConfig: Config): ActorSystem = {
+    ActorSystem.create("flink", akkaConfig)
   }
 
-  def createLocalActorSystem(): ActorSystem = {
-    createActorSystem(getDefaultLocalActorSystemConfig)
+  /**
+   * Creates an actor system with the default config and listening on a random 
port of the
+   * localhost.
+   *
+   * @return default actor system listening on a random port of the localhost
+   */
+  def createDefaultActorSystem(): ActorSystem = {
+    createActorSystem(getDefaultAkkaConfig)
   }
 
-  def createActorSystem(akkaConfig: Config): ActorSystem = {
-    if(LOG.isDebugEnabled) {
-      LOG.debug(s"Using akka config to create actor system: $akkaConfig")
+  /**
+   * Creates an akka config with the provided configuration values. If the 
listening address is
+   * specified, then the actor system will listen on the respective address.
+   *
+   * @param configuration instance containing the user provided configuration 
values
+   * @param listeningAddress optional tuple of hostname and port to listen on. 
If None is given,
+   *                         then an Akka config for local actor system will 
be returned
+   * @return Akka config
+   */
+  def getAkkaConfig(configuration: Configuration,
+                    listeningAddress: Option[(String, Int)]): Config = {
+    val defaultConfig = getBasicAkkaConfig(configuration)
+
+    listeningAddress match {
+      case Some((hostname, port)) =>
+        val remoteConfig = getRemoteAkkaConfig(configuration, hostname, port)
+        remoteConfig.withFallback(defaultConfig)
+      case None =>
+        defaultConfig
     }
-    ActorSystem.create("flink", akkaConfig)
   }
 
-  def getConfigString(host: String, port: Int, configuration: Configuration): 
String = {
+  /**
+   * Creates the default akka configuration which listens on a random port on 
the local machine.
+   * All configuration values are set to default values.
+   *
+   * @return Flink's Akka default config
+   */
+  def getDefaultAkkaConfig: Config = {
+    getAkkaConfig(new Configuration(), Some(("", 0)))
+  }
+
+  /**
+   * Gets the basic Akka config which is shared by remote and local actor 
systems.
+   *
+   * @param configuration instance which contains the user specified values 
for the configuration
+   * @return Flink's basic Akka config
+   */
+  private def getBasicAkkaConfig(configuration: Configuration): Config = {
+    val akkaThroughput = 
configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT,
+      ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT)
+    val lifecycleEvents = 
configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
+      ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
+
+    val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
+
+    val logLevel = getLogLevel
+
+    val config =
+      s"""
+        |akka {
+        | daemonic = on
+        |
+        | loggers = ["akka.event.slf4j.Slf4jLogger"]
+        | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+        | log-config-on-start = off
+        |
+        | jvm-exit-on-fatal-error = off
+        |
+        | serialize-messages = off
+        |
+        | loglevel = $logLevel
+        | stdout-loglevel = $logLevel
+        |
+        | log-dead-letters = $logLifecycleEvents
+        | log-dead-letters-during-shutdown = $logLifecycleEvents
+        |
+        | actor {
+        |   default-dispatcher {
+        |     throughput = $akkaThroughput
+        |
+        |     fork-join-executor {
+        |       parallelism-factor = 2.0
+        |     }
+        |   }
+        | }
+        |}
+      """.stripMargin
+
+    ConfigFactory.parseString(config)
+  }
+
+  /**
+   * Creates a Akka config for a remote actor system listening on port on the 
network interface
+   * identified by hostname.
+   *
+   * @param configuration instance containing the user provided configuration 
values
+   * @param hostname of the network interface to listen on
+   * @param port to bind to or if 0 then Akka picks a free port automatically
+   * @return Flink's Akka configuration for remote actor systems
+   */
+  private def getRemoteAkkaConfig(configuration: Configuration,
+                                  hostname: String, port: Int): Config = {
+    val akkaAskTimeout = 
Duration(configuration.getString(ConfigConstants.AKKA_ASK_TIMEOUT,
+      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT))
+
     val startupTimeout = 
configuration.getString(ConfigConstants.AKKA_STARTUP_TIMEOUT,
-      ConfigConstants.DEFAULT_AKKA_STARTUP_TIMEOUT)
+      akkaAskTimeout.toString)
     val transportHeartbeatInterval = configuration.getString(ConfigConstants.
       AKKA_TRANSPORT_HEARTBEAT_INTERVAL,
       ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL)
@@ -71,13 +185,13 @@ object AkkaUtils {
     val transportThreshold = 
configuration.getDouble(ConfigConstants.AKKA_TRANSPORT_THRESHOLD,
       ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD)
     val watchHeartbeatInterval = configuration.getString(ConfigConstants
-      .AKKA_WATCH_HEARTBEAT_INTERVAL, 
ConfigConstants.DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL)
+      .AKKA_WATCH_HEARTBEAT_INTERVAL, (akkaAskTimeout/10).toString)
     val watchHeartbeatPause = 
configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
-      ConfigConstants.DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE)
+      akkaAskTimeout.toString)
     val watchThreshold = 
configuration.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD,
       ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD)
     val akkaTCPTimeout = 
configuration.getString(ConfigConstants.AKKA_TCP_TIMEOUT,
-      ConfigConstants.DEFAULT_AKKA_TCP_TIMEOUT)
+      akkaAskTimeout.toString)
     val akkaFramesize = configuration.getString(ConfigConstants.AKKA_FRAMESIZE,
       ConfigConstants.DEFAULT_AKKA_FRAMESIZE)
     val akkaThroughput = 
configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT,
@@ -87,11 +201,14 @@ object AkkaUtils {
 
     val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
 
+    val logLevel = getLogLevel
+
     val configString =
       s"""
          |akka {
-         |  log-dead-letters = $logLifecycleEvents
-         |  log-dead-letters-during-shutdown = $logLifecycleEvents
+         |  actor {
+         |    provider = "akka.remote.RemoteActorRefProvider"
+         |  }
          |
          |  remote {
          |    startup-timeout = $startupTimeout
@@ -108,110 +225,40 @@ object AkkaUtils {
          |      threshold = $watchThreshold
          |    }
          |
-         |    netty{
-         |      tcp{
-         |        hostname = $host
+         |    netty {
+         |      tcp {
+         |        transport-class = 
"akka.remote.transport.netty.NettyTransport"
          |        port = $port
          |        connection-timeout = $akkaTCPTimeout
-         |        maximum-frame-size = ${akkaFramesize}
+         |        maximum-frame-size = $akkaFramesize
+         |        tcp-nodelay = on
          |      }
          |    }
          |
          |    log-remote-lifecycle-events = $logLifecycleEvents
          |  }
-         |
-         |  actor{
-         |    default-dispatcher{
-         |
-         |      throughput = ${akkaThroughput}
-         |
-         |      fork-join-executor {
-         |        parallelism-factor = 2.0
-         |      }
-         |    }
-         |  }
-         |
          |}
        """.stripMargin
 
-    getDefaultActorSystemConfigString + configString
-  }
-
-  def getLocalConfigString(configuration: Configuration): String = {
-    val akkaThroughput = 
configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT,
-      ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT)
-    val lifecycleEvents = 
configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
-      ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
-
-    val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
-
-    val configString =
-      s"""
-         |akka {
-         |  log-dead-letters = $logLifecycleEvents
-         |  log-dead-letters-during-shutdown = $logLifecycleEvents
-         |
-         |  actor{
-         |    default-dispatcher{
-         |
-         |      throughput = ${akkaThroughput}
-         |
-         |      fork-join-executor {
-         |        parallelism-factor = 2.0
-         |      }
-         |    }
-         |  }
-         |
-         |}
-       """.stripMargin
-
-    getDefaultLocalActorSystemConfigString + configString
-  }
-
-  def getDefaultActorSystemConfigString: String = {
-    val config = """
-       |akka {
-       |  actor {
-       |    provider = "akka.remote.RemoteActorRefProvider"
-       |  }
-       |
-       |  remote{
-       |    netty{
-       |      tcp{
-       |        port = 0
-       |        transport-class = "akka.remote.transport.netty.NettyTransport"
-       |        tcp-nodelay = on
-       |        maximum-frame-size = 1MB
-       |        execution-pool-size = 4
-       |      }
-       |    }
-       |  }
-       |}
-     """.stripMargin
-
-    getDefaultLocalActorSystemConfigString + config
-  }
+      val hostnameConfigString = if(hostname != null && hostname.nonEmpty){
+        s"""
+           |akka {
+           |  remote {
+           |    netty {
+           |      tcp {
+           |        hostname = $hostname
+           |      }
+           |    }
+           |  }
+           |}
+         """.stripMargin
+      }else{
+        // if hostname is null or empty, then leave hostname unspecified. Akka 
will pick
+        // InetAddress.getLocalHost.getHostAddress
+        ""
+      }
 
-  def getDefaultLocalActorSystemConfigString: String = {
-    val logLevel = getLogLevel
-    s"""
-      |akka {
-      |  daemonic = on
-      |
-      |  loggers = ["akka.event.slf4j.Slf4jLogger"]
-      |  logger-startup-timeout = 30s
-      |  loglevel = ${logLevel}
-      |  stdout-loglevel = "WARNING"
-      |  jvm-exit-on-fatal-error = off
-      |  log-config-on-start = off
-      |
-      |  serialize-messages = on
-      |
-      |  debug {
-      |   lifecycle = on
-      |  }
-      |}
-    """.stripMargin
+    ConfigFactory.parseString(configString + hostnameConfigString)
   }
 
   def getLogLevel: String = {
@@ -234,14 +281,6 @@ object AkkaUtils {
     }
   }
 
-  def getDefaultActorSystemConfig = {
-    ConfigFactory.parseString(getDefaultActorSystemConfigString)
-  }
-
-  def getDefaultLocalActorSystemConfig = {
-    ConfigFactory.parseString(getDefaultLocalActorSystemConfigString)
-  }
-
   def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, 
timeout:
   FiniteDuration): ActorRef = {
     Await.result(system.actorSelection(parent.path / 
child).resolveOne()(timeout), timeout)
@@ -265,6 +304,16 @@ object AkkaUtils {
     Await.result(future, timeout).asInstanceOf[T]
   }
 
+  /**
+   * Utility function to construct a future which tries multiple times to 
execute itself if it
+   * fails. If the maximum number of tries are exceeded, then the future fails.
+   *
+   * @param body function describing the future action
+   * @param tries number of maximum tries before the future fails
+   * @param executionContext which shall execute the future
+   * @tparam T return type of the future
+   * @return future which tries to recover by re-executing itself a given 
number of times
+   */
   def retry[T](body: => T, tries: Int)(implicit executionContext: 
ExecutionContext): Future[T] = {
     Future{ body }.recoverWith{
       case t:Throwable =>
@@ -276,11 +325,32 @@ object AkkaUtils {
     }
   }
 
+  /**
+   * Utility function to construct a future which tries multiple times to 
execute itself if it
+   * fails. If the maximum number of tries are exceeded, then the future fails.
+   *
+   * @param callable future action
+   * @param tries maximum number of tries before the future fails
+   * @param executionContext which shall execute the future
+   * @tparam T return type of the future
+   * @return future which tries to recover by re-executing itself a given 
number of times
+   */
   def retry[T](callable: Callable[T], tries: Int)(implicit executionContext: 
ExecutionContext):
   Future[T] = {
     retry(callable.call(), tries)
   }
 
+  /**
+   * Utility function to construct a future which tries multiple times to 
execute itself if it
+   * fails. If the maximum number of tries are exceeded, then the future fails.
+   *
+   * @param target actor which receives the message
+   * @param message to be sent to the target actor
+   * @param tries maximum number of tries before the future fails
+   * @param executionContext which shall execute the future
+   * @param timeout of the future
+   * @return future which tries to receover by re-executing itself a given 
number of times
+   */
   def retry(target: ActorRef, message: Any, tries: Int)(implicit 
executionContext:
   ExecutionContext, timeout: FiniteDuration): Future[Any] = {
     (target ? message)(timeout) recoverWith{
@@ -292,4 +362,17 @@ object AkkaUtils {
         }
     }
   }
+
+  def getTimeout(config: Configuration): FiniteDuration = {
+    val duration = Duration(config.getString(ConfigConstants.AKKA_ASK_TIMEOUT,
+      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT))
+
+    new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
+  }
+
+  def getDefaultTimeout: FiniteDuration = {
+    val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
+
+    new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 195a0b6..676ddda 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.client
 
 import java.io.IOException
 import java.net.InetSocketAddress
-import java.util.concurrent.TimeUnit
 
 import akka.actor.Status.Failure
 import akka.actor._
@@ -35,45 +34,58 @@ import 
org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, S
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
 import scala.concurrent.{TimeoutException, Await}
-import scala.concurrent.duration.{FiniteDuration}
+import scala.concurrent.duration.FiniteDuration
 
-
-class JobClient(jobManagerURL: String, timeout: FiniteDuration)
-  extends Actor with ActorLogMessages with  ActorLogging{
+/**
+ * Actor which constitutes the bridge between the non-actor code and the 
JobManager. The JobClient
+ * is used to submit jobs to the JobManager and to request the port of the 
BlobManager.
+ *
+ * @param jobManagerURL Akka URL of the JobManager
+ * @param timeout Timeout used for futures
+ */
+class JobClient(jobManagerURL: String, timeout: FiniteDuration) extends
+Actor with ActorLogMessages with ActorLogging {
   import context._
 
   val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout)
 
   override def receiveWithLogMessages: Receive = {
     case SubmitJobDetached(jobGraph) =>
-      jobManager.tell(SubmitJob(jobGraph, registerForEvents = false, detach = 
true), sender)
+      jobManager forward SubmitJob(jobGraph, registerForEvents = false, 
detached = true)
     case cancelJob: CancelJob =>
       jobManager forward cancelJob
     case SubmitJobAndWait(jobGraph, listen) =>
       val listener = context.actorOf(Props(classOf[JobClientListener], sender))
-      jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen, detach = 
false), listener)
+      jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen, detached 
= false), listener)
     case RequestBlobManagerPort =>
       jobManager forward RequestBlobManagerPort
-    case RequestJobManagerStatus => {
+    case RequestJobManagerStatus =>
       jobManager forward RequestJobManagerStatus
-    }
   }
 }
 
-class JobClientListener(client: ActorRef) extends Actor with ActorLogMessages 
with ActorLogging {
+/**
+ * Helper actor which listens to status messages from the JobManager and 
prints them on the
+ * standard output. Such an actor is started for each job, which is configured 
to listen to these
+ * status messages.
+ *
+ * @param jobSubmitter Akka URL of the sender of the job
+ */
+class JobClientListener(jobSubmitter: ActorRef) extends Actor with 
ActorLogMessages with
+ActorLogging {
   override def receiveWithLogMessages: Receive = {
     case SubmissionFailure(_, t) =>
-      client ! Failure(t)
+      jobSubmitter ! Failure(t)
       self ! PoisonPill
     case SubmissionSuccess(_) =>
     case JobResultSuccess(_, duration, accumulatorResults) =>
-      client ! new JobExecutionResult(duration, accumulatorResults)
+      jobSubmitter ! new JobExecutionResult(duration, accumulatorResults)
       self ! PoisonPill
     case JobResultCanceled(_, msg) =>
-      client ! Failure(new JobExecutionException(msg, true))
+      jobSubmitter ! Failure(new JobExecutionException(msg, true))
       self ! PoisonPill
     case JobResultFailed(_, msg) =>
-      client ! Failure(new JobExecutionException(msg, false))
+      jobSubmitter ! Failure(new JobExecutionException(msg, false))
       self ! PoisonPill
     case msg =>
       // we have to use System.out.println here to avoid erroneous behavior 
for output redirection
@@ -81,14 +93,20 @@ class JobClientListener(client: ActorRef) extends Actor 
with ActorLogMessages wi
   }
 }
 
+/**
+ * JobClient's companion object containing convenience functions to start a 
JobClient actor, parse
+ * the configuration to extract the JobClient's settings and convenience 
functions to submit jobs.
+ */
 object JobClient{
   val JOB_CLIENT_NAME = "jobclient"
 
-  def startActorSystemAndActor(config: Configuration): (ActorSystem, ActorRef) 
= {
-    implicit val actorSystem = AkkaUtils.createActorSystem(host = "localhost",
-      port =0, configuration = config)
+  def startActorSystemAndActor(config: Configuration, localActorSystem: 
Boolean):
+  (ActorSystem, ActorRef) = {
+    // start a remote actor system to listen on an arbitrary port
+    implicit val actorSystem = AkkaUtils.createActorSystem(configuration = 
config,
+      listeningAddress = Some(("", 0)))
 
-    (actorSystem, startActorWithConfiguration(config))
+    (actorSystem, startActorWithConfiguration(config, localActorSystem))
   }
 
   def startActor(jobManagerURL: String)(implicit actorSystem: ActorSystem, 
timeout: FiniteDuration):
@@ -96,66 +114,98 @@ object JobClient{
     actorSystem.actorOf(Props(classOf[JobClient], jobManagerURL, timeout), 
JOB_CLIENT_NAME)
   }
 
-  def parseConfiguration(configuration: Configuration): String = {
-    configuration.getString(ConfigConstants.JOB_MANAGER_AKKA_URL, null) match {
-      case url: String => url
-      case _ =>
-        val jobManagerAddress = configuration.getString(ConfigConstants
+  def startActorWithConfiguration(config: Configuration, localActorSystem: 
Boolean)
+                                 (implicit actorSystem: ActorSystem): ActorRef 
= {
+    implicit val timeout = AkkaUtils.getTimeout(config)
+
+    startActor(parseConfiguration(config, localActorSystem))
+  }
+
+  /**
+   * Extracts the JobManager's Akka URL from the configuration. If 
localActorSystem is true, then
+   * the JobClient is executed in the same actor system as the JobManager. 
Thus, they can
+   * communicate locally.
+   *
+   * @param configuration Configuration object containing all user provided 
configuration values
+   * @param localActorSystem  true if the JobClient runs in the same actor 
system as the JobManager,
+   *                          otherwise false
+   * @return Akka URL of the JobManager
+   */
+  def parseConfiguration(configuration: Configuration, localActorSystem: 
Boolean): String = {
+    if(localActorSystem){
+      // JobManager and JobClient run in the same ActorSystem
+      JobManager.getLocalAkkaURL
+    }else{
+      val jobManagerAddress = configuration.getString(ConfigConstants
           .JOB_MANAGER_IPC_ADDRESS_KEY, null)
-        val jobManagerRPCPort = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      val jobManagerRPCPort = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
           ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-        if (jobManagerAddress == null) {
-          throw new RuntimeException("JobManager address has not been 
specified in the " +
-            "configuration.")
-        }
+      if (jobManagerAddress == null) {
+        throw new RuntimeException("JobManager address has not been specified 
in the " +
+          "configuration.")
+      }
 
-        JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + 
jobManagerRPCPort)
+      JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
     }
   }
 
-  def startActorWithConfiguration(config: Configuration)(implicit actorSystem: 
ActorSystem):
-  ActorRef = {
-    implicit val timeout = 
FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
-      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
-
-    startActor(parseConfiguration(config))
-  }
-
+  /**
+   * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which 
submits it then to
+   * the JobManager. The method blocks until the job has finished or the 
JobManager is no longer
+   * alive. In the former case, the [[JobExecutionResult]] is returned and in 
the latter case a
+   * [[JobExecutionException]] is thrown.
+   *
+   * @param jobGraph JobGraph describing the Flink job
+   * @param listenToStatusEvents true if the JobClient shall print status 
events of the
+   *                             corresponding job, otherwise false
+   * @param jobClient ActorRef to the JobClient
+   * @param timeout Timeout for futures
+   * @throws org.apache.flink.runtime.client.JobExecutionException
+   * @return The job execution result
+   */
   @throws(classOf[JobExecutionException])
-  def submitJobAndWait(jobGraph: JobGraph, listen: Boolean, jobClient: 
ActorRef)
+  def submitJobAndWait(jobGraph: JobGraph, listenToStatusEvents: Boolean, 
jobClient: ActorRef)
                       (implicit timeout: FiniteDuration): JobExecutionResult = 
{
+
     var waitForAnswer = true
     var answer: JobExecutionResult = null
 
-    val result =
-      (jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = 
listen))(AkkaUtils.INF_TIMEOUT).
-        mapTo[JobExecutionResult]
+    val result =(jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = 
listenToStatusEvents))(
+      AkkaUtils.INF_TIMEOUT).mapTo[JobExecutionResult]
 
     while(waitForAnswer) {
       try {
         answer = Await.result(result, timeout)
         waitForAnswer = false
       } catch {
-        case x: TimeoutException => {
+        case x: TimeoutException =>
           val jmStatus = (jobClient ? 
RequestJobManagerStatus)(timeout).mapTo[JobManagerStatus]
 
           try {
             Await.result(jmStatus, timeout)
           } catch {
-            case t: Throwable => {
+            case t: Throwable =>
               throw new JobExecutionException("JobManager not reachable 
anymore. Terminate " +
                 "waiting for job answer.", false)
-            }
           }
-        }
       }
     }
 
     answer
   }
 
-
+  /**
+   * Submits a job in detached mode. The method sends the corresponding 
[[JobGraph]] to the
+   * JobClient specified by jobClient. The JobClient does not start a 
[[JobClientListener]] and
+   * simply returns the [[SubmissionResponse]] of the [[JobManager]]. The 
SubmissionResponse is
+   * then returned by this method.
+   *
+   * @param jobGraph Flink job
+   * @param jobClient ActorRef to the JobClient
+   * @param timeout Tiemout for futures
+   * @return The submission response
+   */
   def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef)(implicit 
timeout: FiniteDuration):
   SubmissionResponse = {
     val response = (jobClient ? SubmitJobDetached(jobGraph))(timeout)
@@ -163,6 +213,17 @@ object JobClient{
     Await.result(response.mapTo[SubmissionResponse],timeout)
   }
 
+  /**
+   * Uploads the specified jar files of the [[JobGraph]] jobGraph to the 
BlobServer of the
+   * JobManager. The respective port is retrieved from the JobManager.
+   *
+   * @param jobGraph Flink job containing the information about the required 
jars
+   * @param hostname Hostname of the instance on which the BlobServer and also 
the JobManager run
+   * @param jobClient ActorRef to the JobClient
+   * @param timeout Timeout for futures
+   * @throws java.io.IOException
+   * @return
+   */
   @throws(classOf[IOException])
   def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: 
ActorRef)(implicit timeout:
    FiniteDuration): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 8f5d4d4..128454a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -20,9 +20,18 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.ActorRef
 
-class JobInfo(val client: ActorRef,val start: Long){
+/**
+ * Utility class to store job information on the [[JobManager]]. The JobInfo 
stores which actor
+ * submitted the job, when the start time and, if already terminated, the end 
time was.
+ * Additionally, it stores whether the job was started in the detached mode. 
Detached means that
+ * the submitting actor does not wait for the job result once the job has 
terminated.
+ *
+ * @param client Actor which submitted the job
+ * @param start Starting time
+ */
+class JobInfo(val client: ActorRef, val start: Long){
   var end: Long = -1
-  var detach: Boolean = false
+  var detached: Boolean = false
 
   def duration: Long = {
     if(end != -1){

Reply via email to