Use Akka version 2.2.1 for hadoop version 2.0.0-alpha to resolve dependency conflicts. Adjust code to comply to respective Akka API. Remove obsolete TODO.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f5618fa6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f5618fa6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f5618fa6 Branch: refs/heads/master Commit: f5618fa6608000befce8d4d7a14d1bc6737833e9 Parents: f32c475 Author: Till Rohrmann <[email protected]> Authored: Wed Dec 17 14:49:26 2014 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Dec 18 18:58:32 2014 +0100 ---------------------------------------------------------------------- .../org/apache/flink/yarn/YarnJobManager.scala | 2 +- .../flink/client/CliFrontendListCancelTest.java | 1 - .../apache/flink/runtime/akka/AkkaUtils.scala | 2 - .../apache/flink/runtime/client/JobClient.scala | 4 +- .../flink/runtime/jobmanager/JobManager.scala | 52 ++++++++++---------- .../runtime/jobmanager/MemoryArchivist.scala | 10 ++-- .../flink/runtime/taskmanager/TaskManager.scala | 14 +++--- .../taskmanager/TaskManagerProfiler.scala | 4 +- .../runtime/taskmanager/TaskManagerTest.java | 4 +- .../testingUtils/TestingJobManager.scala | 16 +++--- .../testingUtils/TestingMemoryArchivist.scala | 4 +- .../testingUtils/TestingTaskManager.scala | 14 +++--- .../runtime/testingUtils/TestingUtils.scala | 1 - pom.xml | 11 +++-- 14 files changed, 67 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index a1bed1b..af08f7b 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -91,7 +91,7 @@ trait YarnJobManager extends ActorLogMessages { context.system.shutdown() case RegisterMessageListener => - messageListener = Some(sender()) + messageListener = Some(sender) case StartYarnSession(conf) => { log.info("Start yarn session.") http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java index 8b10ee2..b9af927 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java @@ -35,7 +35,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -//TODO: Update test case public class CliFrontendListCancelTest { private static ActorSystem actorSystem; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 97ce343..5f6d59a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -121,7 +121,6 @@ object AkkaUtils { | | actor{ | default-dispatcher{ - | executor = "default-executor" | | throughput = ${akkaThroughput} | @@ -159,7 +158,6 @@ object AkkaUtils { | | actor{ | default-dispatcher{ - | executor = "default-executor" | | throughput = ${akkaThroughput} | http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index 9377069..9a37cc0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -47,11 +47,11 @@ with ActorLogging{ override def receiveWithLogMessages: Receive = { case SubmitJobDetached(jobGraph) => - jobManager.tell(SubmitJob(jobGraph, registerForEvents = false, detach = true), sender()) + jobManager.tell(SubmitJob(jobGraph, registerForEvents = false, detach = true), sender) case cancelJob: CancelJob => jobManager forward cancelJob case SubmitJobAndWait(jobGraph, listen) => - val listener = context.actorOf(Props(classOf[JobClientListener], sender())) + val listener = context.actorOf(Props(classOf[JobClientListener], sender)) jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen, detach = false), listener) case RequestBlobManagerPort => jobManager forward RequestBlobManagerPort http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 17f123a..0dbbd92 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -103,7 +103,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { override def receiveWithLogMessages: Receive = { case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => { - val taskManager = sender() + val taskManager = sender val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo, hardwareInformation, numberOfSlots) @@ -114,17 +114,17 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { } case RequestNumberRegisteredTaskManager => { - sender() ! instanceManager.getNumberOfRegisteredTaskManagers + sender ! instanceManager.getNumberOfRegisteredTaskManagers } case RequestTotalNumberOfSlots => { - sender() ! instanceManager.getTotalNumberOfSlots + sender ! instanceManager.getTotalNumberOfSlots } case SubmitJob(jobGraph, listenToEvents, detach) => { try { if (jobGraph == null) { - sender() ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" + + sender ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" + " null.")) } else { @@ -138,7 +138,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID(), (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys, userCodeLoader), - JobInfo(sender(), System.currentTimeMillis()))) + JobInfo(sender, System.currentTimeMillis()))) val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){ jobGraph.getNumberOfExecutionRetries @@ -190,8 +190,8 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { if(listenToEvents){ // the sender will be notified about state changes - executionGraph.registerExecutionListener(sender()) - executionGraph.registerJobStatusListener(sender()) + executionGraph.registerExecutionListener(sender) + executionGraph.registerJobStatusListener(sender) } jobInfo.detach = detach @@ -200,7 +200,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { executionGraph.scheduleForExecution(scheduler) - sender() ! SubmissionSuccess(jobGraph.getJobID) + sender ! SubmissionSuccess(jobGraph.getJobID) } } catch { case t: Throwable => @@ -226,7 +226,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { } - sender() ! SubmissionFailure(jobGraph.getJobID, t) + sender ! SubmissionFailure(jobGraph.getJobID, t) } } @@ -238,27 +238,27 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { Future { executionGraph.cancel() } - sender() ! CancellationSuccess(jobID) + sender ! CancellationSuccess(jobID) case None => log.info(s"No job found with ID ${jobID}.") - sender() ! CancellationFailure(jobID, new IllegalArgumentException(s"No job found with " + + sender ! CancellationFailure(jobID, new IllegalArgumentException(s"No job found with " + s"ID ${jobID}.")) } } case UpdateTaskExecutionState(taskExecutionState) => { if(taskExecutionState == null){ - sender() ! false + sender ! false }else { currentJobs.get(taskExecutionState.getJobID) match { case Some((executionGraph, _)) => - val originalSender = sender() + val originalSender = sender Future { originalSender ! executionGraph.updateState(taskExecutionState) } case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState .getJobID} to change state to ${taskExecutionState.getExecutionState}.") - sender() ! false + sender ! false } } } @@ -300,7 +300,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { if(log.isDebugEnabled) { log.debug(s"Send next input split ${nextInputSplit}.") } - sender() ! NextInputSplit(nextInputSplit) + sender ! NextInputSplit(nextInputSplit) } case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) => { @@ -342,7 +342,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { currentJobs.get(jobID) match { case Some(_) => val listeners = finalJobStatusListener.getOrElse(jobID, Set()) - finalJobStatusListener += jobID -> (listeners + sender()) + finalJobStatusListener += jobID -> (listeners + sender) case None => archive ! RequestJobStatus(jobID) } @@ -351,7 +351,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { case LookupConnectionInformation(connectionInformation, jobID, sourceChannelID) => { currentJobs.get(jobID) match { case Some((executionGraph, _)) => - val originalSender = sender() + val originalSender = sender Future { originalSender ! ConnectionInformation( executionGraph.lookupConnectionInfoAndDeployReceivers @@ -359,7 +359,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { } case None => log.error(s"Cannot find execution graph for job ID ${jobID}.") - sender() ! ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound()) + sender ! ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound()) } } @@ -371,14 +371,14 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { case RequestAccumulatorResults(jobID) => { import scala.collection.JavaConverters._ - sender() ! AccumulatorResultsFound(jobID, accumulatorManager.getJobAccumulatorResults + sender ! AccumulatorResultsFound(jobID, accumulatorManager.getJobAccumulatorResults (jobID).asScala.toMap) } case RequestJobStatus(jobID) => { currentJobs.get(jobID) match { - case Some((executionGraph,_)) => sender() ! CurrentJobStatus(jobID, executionGraph.getState) - case None => (archive ? RequestJobStatus(jobID))(timeout) pipeTo sender() + case Some((executionGraph,_)) => sender ! CurrentJobStatus(jobID, executionGraph.getState) + case None => (archive ? RequestJobStatus(jobID))(timeout) pipeTo sender } } @@ -387,23 +387,23 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { case (_, (eg, jobInfo)) => eg } - sender() ! RunningJobs(executionGraphs) + sender ! RunningJobs(executionGraphs) } case RequestJob(jobID) => { currentJobs.get(jobID) match { - case Some((eg, _)) => sender() ! JobFound(jobID, eg) - case None => (archive ? RequestJob(jobID))(timeout) pipeTo sender() + case Some((eg, _)) => sender ! JobFound(jobID, eg) + case None => (archive ? RequestJob(jobID))(timeout) pipeTo sender } } case RequestBlobManagerPort => { - sender() ! libraryCacheManager.getBlobServerPort + sender ! libraryCacheManager.getBlobServerPort } case RequestRegisteredTaskManagers => { import scala.collection.JavaConverters._ - sender() ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala) + sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala) } case Heartbeat(instanceID) => { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index e63cf68..60aa4b1 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -41,20 +41,20 @@ ActorLogging { } case RequestArchivedJobs => { - sender() ! ArchivedJobs(graphs.values) + sender ! ArchivedJobs(graphs.values) } case RequestJob(jobID) => { graphs.get(jobID) match { - case Some(graph) => sender() ! JobFound(jobID, graph) - case None => sender() ! JobNotFound(jobID) + case Some(graph) => sender ! JobFound(jobID, graph) + case None => sender ! JobNotFound(jobID) } } case RequestJobStatus(jobID) => { graphs.get(jobID) match { - case Some(eg) => sender() ! CurrentJobStatus(jobID, eg.getState) - case None => sender() ! JobNotFound(jobID) + case Some(eg) => sender ! CurrentJobStatus(jobID, eg.getState) + case None => sender ! JobNotFound(jobID) } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a4040fa..bb4a241 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -181,7 +181,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka case AcknowledgeRegistration(id, blobPort) => { if (!registered) { registered = true - currentJobManager = sender() + currentJobManager = sender instanceID = id context.watch(currentJobManager) @@ -216,9 +216,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka Future { task.cancelExecution() } - sender() ! new TaskOperationResult(executionID, true) + sender ! new TaskOperationResult(executionID, true) case None => - sender() ! new TaskOperationResult(executionID, false, "No task with that execution ID " + + sender ! new TaskOperationResult(executionID, false, "No task with that execution ID " + "was " + "found.") } @@ -295,7 +295,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka throw new RuntimeException("Cannot start task. Task was canceled or failed.") } - sender() ! TaskOperationResult(executionID, true) + sender ! TaskOperationResult(executionID, true) } catch { case t: Throwable => log.error(t, s"Could not instantiate task with execution ID ${executionID}.") @@ -316,7 +316,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka } } - sender() ! new TaskOperationResult(executionID, false, + sender ! new TaskOperationResult(executionID, false, ExceptionUtils.stringifyException(t)) } } @@ -343,8 +343,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka case NotifyWhenRegisteredAtJobManager => { registered match { - case true => sender() ! RegisteredAtJobManager - case false => waitForRegistration += sender() + case true => sender ! RegisteredAtJobManager + case false => waitForRegistration += sender } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala index a1df1b9..616fb61 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala @@ -67,14 +67,14 @@ ActorLogMessages with ActorLogging { } case RegisterProfilingListener => { - listeners += sender() + listeners += sender if (monitoringScheduler.isEmpty) { startMonitoring } } case UnregisterProfilingListener => { - listeners -= sender() + listeners -= sender if (listeners.isEmpty) { stopMonitoring } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 35aac70..913941d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -92,6 +92,7 @@ public class TaskManagerTest { new JavaTestKit(system){{ try { ActorRef jobManager = system.actorOf(Props.create(SimpleJobManager.class)); + final ActorRef tm = createTaskManager(jobManager); JobID jid = new JobID(); @@ -459,6 +460,7 @@ public class TaskManagerTest { // -------------------------------------------------------------------------------------------- public static class SimpleJobManager extends UntypedActor{ + @Override public void onReceive(Object message) throws Exception { if(message instanceof RegistrationMessages.RegisterTaskManager){ @@ -535,7 +537,7 @@ public class TaskManagerTest { Configuration cfg = new Configuration(); cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10); GlobalConfiguration.includeConfiguration(cfg); - String akkaURL = jm.path().toSerializationFormat(); + String akkaURL = jm.path().toString(); cfg.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, akkaURL); ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index bfb551f..059b7fa 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -47,20 +47,20 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { def receiveTestingMessages: Receive = { case RequestExecutionGraph(jobID) => currentJobs.get(jobID) match { - case Some((executionGraph, jobInfo)) => sender() ! ExecutionGraphFound(jobID, + case Some((executionGraph, jobInfo)) => sender ! ExecutionGraphFound(jobID, executionGraph) - case None => archive.tell(RequestExecutionGraph(jobID), sender()) + case None => archive.tell(RequestExecutionGraph(jobID), sender) } case WaitForAllVerticesToBeRunning(jobID) => if(checkIfAllVerticesRunning(jobID)){ - sender() ! AllVerticesRunning(jobID) + sender ! AllVerticesRunning(jobID) }else{ currentJobs.get(jobID) match { case Some((eg, _)) => eg.registerExecutionListener(self) case None => } val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]()) - waitForAllVerticesToBeRunning += jobID -> (waiting + sender()) + waitForAllVerticesToBeRunning += jobID -> (waiting + sender) } case ExecutionStateChanged(jobID, _, _, _, _, _, _, _, _) => val cleanup = waitForAllVerticesToBeRunning.get(jobID) match { @@ -84,12 +84,8 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { } import context.dispatcher -// val f = Future.sequence(responses) -// -// val t = Await.result(f, timeout) -// -// sender() ! true - Future.fold(responses)(true)(_ & _) pipeTo sender() + + Future.fold(responses)(true)(_ & _) pipeTo sender } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala index 3a6fb78..71d0feb 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala @@ -32,8 +32,8 @@ trait TestingMemoryArchivist extends ActorLogMessages { def receiveTestingMessages: Receive = { case RequestExecutionGraph(jobID) => graphs.get(jobID) match { - case Some(executionGraph) => sender() ! ExecutionGraphFound(jobID, executionGraph) - case None => sender() ! ExecutionGraphNotFound(jobID) + case Some(executionGraph) => sender ! ExecutionGraphFound(jobID, executionGraph) + case None => sender ! ExecutionGraphNotFound(jobID) } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 31a43cb..080af11 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -40,13 +40,13 @@ trait TestingTaskManager extends ActorLogMessages { def receiveTestMessages: Receive = { case RequestRunningTasks => - sender() ! ResponseRunningTasks(runningTasks.toMap) + sender ! ResponseRunningTasks(runningTasks.toMap) case NotifyWhenTaskRemoved(executionID) => runningTasks.get(executionID) match { case Some(_) => val set = waitForRemoval.getOrElse(executionID, Set()) - waitForRemoval += (executionID -> (set + sender())) - case None => sender() ! true + waitForRemoval += (executionID -> (set + sender)) + case None => sender ! true } case UnregisterTask(executionID) => super.receiveWithLogMessages(UnregisterTask(executionID)) @@ -55,19 +55,19 @@ trait TestingTaskManager extends ActorLogMessages { case None => } case RequestBroadcastVariablesWithReferences => { - sender() ! ResponseBroadcastVariablesWithReferences( + sender ! ResponseBroadcastVariablesWithReferences( bcVarManager.getNumberOfVariablesWithReferences) } case NotifyWhenJobRemoved(jobID) => { if(runningTasks.values.exists(_.getJobID == jobID)){ val set = waitForJobRemoval.getOrElse(jobID, Set()) - waitForJobRemoval += (jobID -> (set + sender())) + waitForJobRemoval += (jobID -> (set + sender)) import context.dispatcher context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID)) }else{ waitForJobRemoval.get(jobID) match { - case Some(listeners) => (listeners + sender()) foreach (_ ! true) - case None => sender() ! true + case Some(listeners) => (listeners + sender) foreach (_ ! true) + case None => sender ! true } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index ce068c9..4997f30 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -60,7 +60,6 @@ object TestingUtils { """.stripMargin } - // scalastyle:off line.size.limit val getTestingSerializationBindings = """ http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f5618fa6/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3eaa76e..2a252f8 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,7 @@ under the License. <akka.version>2.3.7</akka.version> <scala.binary.version>2.10</scala.binary.version> <scala.macros.version>2.0.1</scala.macros.version> + <kryoserialization.version>0.3.2</kryoserialization.version> </properties> <dependencies> @@ -284,7 +285,7 @@ under the License. <dependency> <groupId>com.github.romix.akka</groupId> <artifactId>akka-kryo-serialization_2.10</artifactId> - <version>0.3.2</version> + <version>${kryoserialization.version}</version> </dependency> <dependency> @@ -376,6 +377,10 @@ under the License. <value>2.0.0-alpha</value> </property> </activation> + <properties> + <akka.version>2.2.1</akka.version> + <kryoserialization.version>0.3.1</kryoserialization.version> + </properties> <dependencyManagement> <dependencies> <dependency> @@ -387,10 +392,6 @@ under the License. <groupId>org.jboss.netty</groupId> <artifactId>netty</artifactId> </exclusion> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> </exclusions> </dependency> </dependencies>
