Use Akka version 2.2.1 for hadoop version 2.0.0-alpha to resolve dependency 
conflicts. Adjust code to comply to respective Akka API. Remove obsolete TODO.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f5618fa6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f5618fa6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f5618fa6

Branch: refs/heads/master
Commit: f5618fa6608000befce8d4d7a14d1bc6737833e9
Parents: f32c475
Author: Till Rohrmann <[email protected]>
Authored: Wed Dec 17 14:49:26 2014 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Dec 18 18:58:32 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/yarn/YarnJobManager.scala  |  2 +-
 .../flink/client/CliFrontendListCancelTest.java |  1 -
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  2 -
 .../apache/flink/runtime/client/JobClient.scala |  4 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 52 ++++++++++----------
 .../runtime/jobmanager/MemoryArchivist.scala    | 10 ++--
 .../flink/runtime/taskmanager/TaskManager.scala | 14 +++---
 .../taskmanager/TaskManagerProfiler.scala       |  4 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  4 +-
 .../testingUtils/TestingJobManager.scala        | 16 +++---
 .../testingUtils/TestingMemoryArchivist.scala   |  4 +-
 .../testingUtils/TestingTaskManager.scala       | 14 +++---
 .../runtime/testingUtils/TestingUtils.scala     |  1 -
 pom.xml                                         | 11 +++--
 14 files changed, 67 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
 
b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index a1bed1b..af08f7b 100644
--- 
a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ 
b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -91,7 +91,7 @@ trait YarnJobManager extends ActorLogMessages {
       context.system.shutdown()
 
     case RegisterMessageListener =>
-      messageListener = Some(sender())
+      messageListener = Some(sender)
 
     case StartYarnSession(conf) => {
       log.info("Start yarn session.")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 8b10ee2..b9af927 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -35,7 +35,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-//TODO: Update test case
 public class CliFrontendListCancelTest {
 
        private static ActorSystem actorSystem;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 97ce343..5f6d59a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -121,7 +121,6 @@ object AkkaUtils {
          |
          |  actor{
          |    default-dispatcher{
-         |      executor = "default-executor"
          |
          |      throughput = ${akkaThroughput}
          |
@@ -159,7 +158,6 @@ object AkkaUtils {
          |
          |  actor{
          |    default-dispatcher{
-         |      executor = "default-executor"
          |
          |      throughput = ${akkaThroughput}
          |

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 9377069..9a37cc0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -47,11 +47,11 @@ with  ActorLogging{
 
   override def receiveWithLogMessages: Receive = {
     case SubmitJobDetached(jobGraph) =>
-      jobManager.tell(SubmitJob(jobGraph, registerForEvents = false, detach = 
true), sender())
+      jobManager.tell(SubmitJob(jobGraph, registerForEvents = false, detach = 
true), sender)
     case cancelJob: CancelJob =>
       jobManager forward cancelJob
     case SubmitJobAndWait(jobGraph, listen) =>
-      val listener = context.actorOf(Props(classOf[JobClientListener], 
sender()))
+      val listener = context.actorOf(Props(classOf[JobClientListener], sender))
       jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen, detach = 
false), listener)
     case RequestBlobManagerPort =>
       jobManager forward RequestBlobManagerPort

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 17f123a..0dbbd92 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -103,7 +103,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
 
   override def receiveWithLogMessages: Receive = {
     case RegisterTaskManager(connectionInfo, hardwareInformation, 
numberOfSlots) => {
-      val taskManager = sender()
+      val taskManager = sender
       val instanceID = instanceManager.registerTaskManager(taskManager, 
connectionInfo,
         hardwareInformation, numberOfSlots)
 
@@ -114,17 +114,17 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
     }
 
     case RequestNumberRegisteredTaskManager => {
-      sender() ! instanceManager.getNumberOfRegisteredTaskManagers
+      sender ! instanceManager.getNumberOfRegisteredTaskManagers
     }
 
     case RequestTotalNumberOfSlots => {
-      sender() ! instanceManager.getTotalNumberOfSlots
+      sender ! instanceManager.getTotalNumberOfSlots
     }
 
     case SubmitJob(jobGraph, listenToEvents, detach) => {
       try {
         if (jobGraph == null) {
-          sender() ! akka.actor.Status.Failure(new 
IllegalArgumentException("JobGraph must not be" +
+          sender ! akka.actor.Status.Failure(new 
IllegalArgumentException("JobGraph must not be" +
             " null."))
         } else {
 
@@ -138,7 +138,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
           val (executionGraph, jobInfo) = 
currentJobs.getOrElseUpdate(jobGraph.getJobID(),
             (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
               jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys, 
userCodeLoader),
-              JobInfo(sender(), System.currentTimeMillis())))
+              JobInfo(sender, System.currentTimeMillis())))
 
           val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){
             jobGraph.getNumberOfExecutionRetries
@@ -190,8 +190,8 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
 
           if(listenToEvents){
             // the sender will be notified about state changes
-            executionGraph.registerExecutionListener(sender())
-            executionGraph.registerJobStatusListener(sender())
+            executionGraph.registerExecutionListener(sender)
+            executionGraph.registerJobStatusListener(sender)
           }
 
           jobInfo.detach = detach
@@ -200,7 +200,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
 
           executionGraph.scheduleForExecution(scheduler)
 
-          sender() ! SubmissionSuccess(jobGraph.getJobID)
+          sender ! SubmissionSuccess(jobGraph.getJobID)
         }
       } catch {
         case t: Throwable =>
@@ -226,7 +226,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
 
           }
 
-          sender() ! SubmissionFailure(jobGraph.getJobID, t)
+          sender ! SubmissionFailure(jobGraph.getJobID, t)
       }
     }
 
@@ -238,27 +238,27 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
           Future {
             executionGraph.cancel()
           }
-          sender() ! CancellationSuccess(jobID)
+          sender ! CancellationSuccess(jobID)
         case None =>
           log.info(s"No job found with ID ${jobID}.")
-          sender() ! CancellationFailure(jobID, new 
IllegalArgumentException(s"No job found with " +
+          sender ! CancellationFailure(jobID, new 
IllegalArgumentException(s"No job found with " +
             s"ID ${jobID}."))
       }
     }
 
     case UpdateTaskExecutionState(taskExecutionState) => {
       if(taskExecutionState == null){
-        sender() ! false
+        sender ! false
       }else {
         currentJobs.get(taskExecutionState.getJobID) match {
           case Some((executionGraph, _)) =>
-            val originalSender = sender()
+            val originalSender = sender
             Future {
               originalSender ! executionGraph.updateState(taskExecutionState)
             }
           case None => log.error(s"Cannot find execution graph for ID 
${taskExecutionState
             .getJobID} to change state to 
${taskExecutionState.getExecutionState}.")
-            sender() ! false
+            sender ! false
         }
       }
     }
@@ -300,7 +300,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
       if(log.isDebugEnabled) {
         log.debug(s"Send next input split ${nextInputSplit}.")
       }
-      sender() ! NextInputSplit(nextInputSplit)
+      sender ! NextInputSplit(nextInputSplit)
     }
 
     case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) => {
@@ -342,7 +342,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
       currentJobs.get(jobID) match {
         case Some(_) =>
           val listeners = finalJobStatusListener.getOrElse(jobID, Set())
-          finalJobStatusListener += jobID -> (listeners + sender())
+          finalJobStatusListener += jobID -> (listeners + sender)
         case None =>
           archive ! RequestJobStatus(jobID)
       }
@@ -351,7 +351,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
     case LookupConnectionInformation(connectionInformation, jobID, 
sourceChannelID) => {
       currentJobs.get(jobID) match {
         case Some((executionGraph, _)) =>
-          val originalSender = sender()
+          val originalSender = sender
           Future {
             originalSender ! ConnectionInformation(
               executionGraph.lookupConnectionInfoAndDeployReceivers
@@ -359,7 +359,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
           }
         case None =>
           log.error(s"Cannot find execution graph for job ID ${jobID}.")
-          sender() ! 
ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound())
+          sender ! 
ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound())
       }
     }
 
@@ -371,14 +371,14 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
 
     case RequestAccumulatorResults(jobID) => {
       import scala.collection.JavaConverters._
-      sender() ! AccumulatorResultsFound(jobID, 
accumulatorManager.getJobAccumulatorResults
+      sender ! AccumulatorResultsFound(jobID, 
accumulatorManager.getJobAccumulatorResults
         (jobID).asScala.toMap)
     }
 
     case RequestJobStatus(jobID) => {
       currentJobs.get(jobID) match {
-        case Some((executionGraph,_)) => sender() ! CurrentJobStatus(jobID, 
executionGraph.getState)
-        case None => (archive ? RequestJobStatus(jobID))(timeout) pipeTo 
sender()
+        case Some((executionGraph,_)) => sender ! CurrentJobStatus(jobID, 
executionGraph.getState)
+        case None => (archive ? RequestJobStatus(jobID))(timeout) pipeTo sender
       }
     }
 
@@ -387,23 +387,23 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
         case (_, (eg, jobInfo)) => eg
       }
 
-      sender() ! RunningJobs(executionGraphs)
+      sender ! RunningJobs(executionGraphs)
     }
 
     case RequestJob(jobID) => {
       currentJobs.get(jobID) match {
-        case Some((eg, _)) => sender() ! JobFound(jobID, eg)
-        case None => (archive ? RequestJob(jobID))(timeout) pipeTo sender()
+        case Some((eg, _)) => sender ! JobFound(jobID, eg)
+        case None => (archive ? RequestJob(jobID))(timeout) pipeTo sender
       }
     }
 
     case RequestBlobManagerPort => {
-      sender() ! libraryCacheManager.getBlobServerPort
+      sender ! libraryCacheManager.getBlobServerPort
     }
 
     case RequestRegisteredTaskManagers => {
       import scala.collection.JavaConverters._
-      sender() ! 
RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
+      sender ! 
RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
     }
 
     case Heartbeat(instanceID) => {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index e63cf68..60aa4b1 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -41,20 +41,20 @@ ActorLogging {
     }
 
     case RequestArchivedJobs => {
-      sender() ! ArchivedJobs(graphs.values)
+      sender ! ArchivedJobs(graphs.values)
     }
 
     case RequestJob(jobID) => {
       graphs.get(jobID) match {
-        case Some(graph) => sender() ! JobFound(jobID, graph)
-        case None => sender() ! JobNotFound(jobID)
+        case Some(graph) => sender ! JobFound(jobID, graph)
+        case None => sender ! JobNotFound(jobID)
       }
     }
 
     case RequestJobStatus(jobID) => {
       graphs.get(jobID) match {
-        case Some(eg) => sender() ! CurrentJobStatus(jobID, eg.getState)
-        case None => sender() ! JobNotFound(jobID)
+        case Some(eg) => sender ! CurrentJobStatus(jobID, eg.getState)
+        case None => sender ! JobNotFound(jobID)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a4040fa..bb4a241 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -181,7 +181,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
     case AcknowledgeRegistration(id, blobPort) => {
       if (!registered) {
         registered = true
-        currentJobManager = sender()
+        currentJobManager = sender
         instanceID = id
 
         context.watch(currentJobManager)
@@ -216,9 +216,9 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
           Future {
             task.cancelExecution()
           }
-          sender() ! new TaskOperationResult(executionID, true)
+          sender ! new TaskOperationResult(executionID, true)
         case None =>
-          sender() ! new TaskOperationResult(executionID, false, "No task with 
that execution ID " +
+          sender ! new TaskOperationResult(executionID, false, "No task with 
that execution ID " +
             "was " +
             "found.")
       }
@@ -295,7 +295,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
           throw new RuntimeException("Cannot start task. Task was canceled or 
failed.")
         }
 
-        sender() ! TaskOperationResult(executionID, true)
+        sender ! TaskOperationResult(executionID, true)
       } catch {
         case t: Throwable =>
           log.error(t, s"Could not instantiate task with execution ID 
${executionID}.")
@@ -316,7 +316,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
             }
           }
 
-          sender() ! new TaskOperationResult(executionID, false,
+          sender ! new TaskOperationResult(executionID, false,
             ExceptionUtils.stringifyException(t))
       }
     }
@@ -343,8 +343,8 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
 
     case NotifyWhenRegisteredAtJobManager => {
       registered match {
-        case true => sender() ! RegisteredAtJobManager
-        case false => waitForRegistration += sender()
+        case true => sender ! RegisteredAtJobManager
+        case false => waitForRegistration += sender
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
index a1df1b9..616fb61 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
@@ -67,14 +67,14 @@ ActorLogMessages with ActorLogging {
     }
 
     case RegisterProfilingListener => {
-      listeners += sender()
+      listeners += sender
       if (monitoringScheduler.isEmpty) {
         startMonitoring
       }
     }
 
     case UnregisterProfilingListener => {
-      listeners -= sender()
+      listeners -= sender
       if (listeners.isEmpty) {
         stopMonitoring
       }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 35aac70..913941d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -92,6 +92,7 @@ public class TaskManagerTest {
                new JavaTestKit(system){{
                        try {
                                ActorRef jobManager = 
system.actorOf(Props.create(SimpleJobManager.class));
+
                                final ActorRef tm = 
createTaskManager(jobManager);
 
                                JobID jid = new JobID();
@@ -459,6 +460,7 @@ public class TaskManagerTest {
        // 
--------------------------------------------------------------------------------------------
 
        public static class SimpleJobManager extends UntypedActor{
+
                @Override
                public void onReceive(Object message) throws Exception {
                        if(message instanceof 
RegistrationMessages.RegisterTaskManager){
@@ -535,7 +537,7 @@ public class TaskManagerTest {
                Configuration cfg = new Configuration();
                cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
10);
                GlobalConfiguration.includeConfiguration(cfg);
-               String akkaURL = jm.path().toSerializationFormat();
+               String akkaURL = jm.path().toString();
                cfg.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, akkaURL);
 
                ActorRef taskManager = 
TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index bfb551f..059b7fa 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -47,20 +47,20 @@ trait TestingJobManager extends ActorLogMessages with 
WrapAsScala {
   def receiveTestingMessages: Receive = {
     case RequestExecutionGraph(jobID) =>
       currentJobs.get(jobID) match {
-        case Some((executionGraph, jobInfo)) => sender() ! 
ExecutionGraphFound(jobID,
+        case Some((executionGraph, jobInfo)) => sender ! 
ExecutionGraphFound(jobID,
           executionGraph)
-        case None => archive.tell(RequestExecutionGraph(jobID), sender())
+        case None => archive.tell(RequestExecutionGraph(jobID), sender)
       }
     case WaitForAllVerticesToBeRunning(jobID) =>
       if(checkIfAllVerticesRunning(jobID)){
-        sender() ! AllVerticesRunning(jobID)
+        sender ! AllVerticesRunning(jobID)
       }else{
         currentJobs.get(jobID) match {
           case Some((eg, _)) => eg.registerExecutionListener(self)
           case None =>
         }
         val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, 
Set[ActorRef]())
-        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
+        waitForAllVerticesToBeRunning += jobID -> (waiting + sender)
       }
     case ExecutionStateChanged(jobID, _, _, _, _, _, _, _, _) =>
       val cleanup = waitForAllVerticesToBeRunning.get(jobID) match {
@@ -84,12 +84,8 @@ trait TestingJobManager extends ActorLogMessages with 
WrapAsScala {
       }
 
       import context.dispatcher
-//      val f = Future.sequence(responses)
-//
-//      val t = Await.result(f, timeout)
-//
-//      sender() ! true
-      Future.fold(responses)(true)(_ & _) pipeTo sender()
+
+      Future.fold(responses)(true)(_ & _) pipeTo sender
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index 3a6fb78..71d0feb 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -32,8 +32,8 @@ trait TestingMemoryArchivist extends ActorLogMessages {
   def receiveTestingMessages: Receive = {
     case RequestExecutionGraph(jobID) =>
       graphs.get(jobID) match {
-        case Some(executionGraph) => sender() ! ExecutionGraphFound(jobID, 
executionGraph)
-        case None => sender() ! ExecutionGraphNotFound(jobID)
+        case Some(executionGraph) => sender ! ExecutionGraphFound(jobID, 
executionGraph)
+        case None => sender ! ExecutionGraphNotFound(jobID)
       }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 31a43cb..080af11 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -40,13 +40,13 @@ trait TestingTaskManager extends ActorLogMessages {
 
   def receiveTestMessages: Receive = {
     case RequestRunningTasks =>
-      sender() ! ResponseRunningTasks(runningTasks.toMap)
+      sender ! ResponseRunningTasks(runningTasks.toMap)
     case NotifyWhenTaskRemoved(executionID) =>
       runningTasks.get(executionID) match {
         case Some(_) =>
           val set = waitForRemoval.getOrElse(executionID, Set())
-          waitForRemoval += (executionID -> (set + sender()))
-        case None => sender() ! true
+          waitForRemoval += (executionID -> (set + sender))
+        case None => sender ! true
       }
     case UnregisterTask(executionID) =>
       super.receiveWithLogMessages(UnregisterTask(executionID))
@@ -55,19 +55,19 @@ trait TestingTaskManager extends ActorLogMessages {
         case None =>
       }
     case RequestBroadcastVariablesWithReferences => {
-      sender() ! ResponseBroadcastVariablesWithReferences(
+      sender ! ResponseBroadcastVariablesWithReferences(
         bcVarManager.getNumberOfVariablesWithReferences)
     }
     case NotifyWhenJobRemoved(jobID) => {
       if(runningTasks.values.exists(_.getJobID == jobID)){
         val set = waitForJobRemoval.getOrElse(jobID, Set())
-        waitForJobRemoval += (jobID -> (set + sender()))
+        waitForJobRemoval += (jobID -> (set + sender))
         import context.dispatcher
         context.system.scheduler.scheduleOnce(200 milliseconds, this.self, 
CheckIfJobRemoved(jobID))
       }else{
         waitForJobRemoval.get(jobID) match {
-          case Some(listeners) => (listeners + sender()) foreach (_ ! true)
-          case None => sender() ! true
+          case Some(listeners) => (listeners + sender) foreach (_ ! true)
+          case None => sender ! true
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index ce068c9..4997f30 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -60,7 +60,6 @@ object TestingUtils {
     """.stripMargin
   }
 
-
   // scalastyle:off line.size.limit
   val getTestingSerializationBindings =
   """

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3eaa76e..2a252f8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@ under the License.
                <akka.version>2.3.7</akka.version>
                <scala.binary.version>2.10</scala.binary.version>
                <scala.macros.version>2.0.1</scala.macros.version>
+               <kryoserialization.version>0.3.2</kryoserialization.version>
        </properties>
 
        <dependencies>
@@ -284,7 +285,7 @@ under the License.
                        <dependency>
                                <groupId>com.github.romix.akka</groupId>
                                
<artifactId>akka-kryo-serialization_2.10</artifactId>
-                               <version>0.3.2</version>
+                               <version>${kryoserialization.version}</version>
                        </dependency>
 
                        <dependency>
@@ -376,6 +377,10 @@ under the License.
                                        <value>2.0.0-alpha</value>
                                </property>
                        </activation>
+                       <properties>
+                               <akka.version>2.2.1</akka.version>
+                               
<kryoserialization.version>0.3.1</kryoserialization.version>
+                       </properties>
                        <dependencyManagement>
                                <dependencies>
                                        <dependency>
@@ -387,10 +392,6 @@ under the License.
                                                                
<groupId>org.jboss.netty</groupId>
                                                                
<artifactId>netty</artifactId>
                                                        </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>com.google.protobuf</groupId>
-                                                               
<artifactId>protobuf-java</artifactId>
-                                                       </exclusion>
                                                </exclusions>
                                        </dependency>
                                </dependencies>

Reply via email to