Removed dead instance cleanup from InstanceManager so that Akka's watch mechanism is the current mean to detect dead instances.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8d414d7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8d414d7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8d414d7e Branch: refs/heads/master Commit: 8d414d7eb534bf5defa4aadd9b4f0a12a0be7ca8 Parents: 8eadd3e Author: Till Rohrmann <[email protected]> Authored: Mon Nov 17 18:33:45 2014 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Dec 18 18:58:31 2014 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/program/Client.java | 1 + .../flink/configuration/ConfigConstants.java | 10 +- flink-runtime/pom.xml | 6 +- .../runtime/executiongraph/ExecutionGraph.java | 3 +- .../apache/flink/runtime/instance/Instance.java | 27 +--- .../flink/runtime/instance/InstanceManager.java | 85 +------------ .../org/apache/flink/runtime/net/NetUtils.java | 8 +- .../runtime/operators/RegularPactTask.java | 1 - .../apache/flink/runtime/akka/AkkaUtils.scala | 125 ++++++++++++++----- .../flink/runtime/akka/KryoInitializer.scala | 26 ++++ .../flink/runtime/jobmanager/JobManager.scala | 4 +- .../flink/runtime/taskmanager/TaskManager.scala | 13 +- .../runtime/instance/InstanceManagerTest.java | 79 ------------ .../scheduler/SchedulerIsolatedTasksTest.java | 6 - .../apache/flink/api/scala/ClosureCleaner.scala | 2 +- .../test/cancelling/CancellingTestBase.java | 2 - .../src/test/resources/log4j-test.properties | 2 +- .../PartitionOperatorTranslationTest.scala | 2 +- .../scala/functions/ClosureCleanerITCase.scala | 2 +- .../misc/MassiveCaseClassSortingITCase.scala | 2 +- .../CoGroupCustomPartitioningTest.scala | 2 +- .../CoGroupGroupSortTranslationTest.scala | 2 +- ...tomPartitioningGroupingKeySelectorTest.scala | 2 +- .../CustomPartitioningGroupingPojoTest.scala | 2 +- .../CustomPartitioningGroupingTupleTest.scala | 2 +- .../translation/CustomPartitioningTest.scala | 2 +- .../JoinCustomPartitioningTest.scala | 2 +- .../translation/PartitioningTestClasses.scala | 12 +- .../scala/runtime/CaseClassComparatorTest.scala | 2 +- pom.xml | 6 + 30 files changed, 180 insertions(+), 260 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index 45848c2..c4e71ef 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -54,6 +54,7 @@ import com.google.common.base.Preconditions; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; +import scala.Tuple2; import scala.concurrent.duration.FiniteDuration; /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index f0ab180..ab1ed78 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -578,17 +578,17 @@ public final class ConfigConstants { // ------------------------------ Akka Values ------------------------------ - public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 ms"; + public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "5000 ms"; - public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "10 s"; + public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "100 s"; public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0; - public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "1000 ms"; + public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms"; - public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "10 s"; + public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "100 s"; - public static double DEFAULT_AKKA_WATCH_THRESHOLD = 10.0; + public static double DEFAULT_AKKA_WATCH_THRESHOLD = 300.0; public static String DEFAULT_AKKA_TCP_TIMEOUT = "15 s"; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index f04475c..b7edf7a 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -123,6 +123,11 @@ under the License. </dependency> <dependency> + <groupId>com.github.romix.akka</groupId> + <artifactId>akka-kryo-serialization_2.10</artifactId> + </dependency> + + <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.10</artifactId> </dependency> @@ -289,7 +294,6 @@ under the License. <systemPropertyVariables> <log.level>WARN</log.level> </systemPropertyVariables> - <reuseForks>false</reuseForks> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 8faa235..7d38eac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import java.io.Serializable; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -58,7 +59,7 @@ import org.apache.flink.util.ExceptionUtils; import static akka.dispatch.Futures.future; -public class ExecutionGraph { +public class ExecutionGraph implements Serializable { private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state"); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index 18d3212..aaa276d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -116,32 +116,7 @@ public class Instance { public boolean isAlive() { return !isDead; } - - public void stopInstance() { - try { - final TaskOperationProtocol tmProxy = this.getTaskManagerProxy(); - // start a thread for stopping the TM to avoid infinitive blocking. - Runnable r = new Runnable() { - @Override - public void run() { - try { - tmProxy.killTaskManager(); - } catch (IOException e) { - if (Log.isDebugEnabled()) { - Log.debug("Error while stopping TaskManager", e); - } - } - } - }; - Thread t = new Thread(r); - t.setDaemon(true); // do not prevent the JVM from stopping - t.start(); - } catch (Exception e) { - if (Log.isDebugEnabled()) { - Log.debug("Error while stopping TaskManager", e); - } - } - } + public void markDead() { if (isDead) { return; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java index 10c89e4..3ce3ac7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java @@ -22,12 +22,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import akka.actor.ActorRef; import org.apache.flink.configuration.ConfigConstants; @@ -97,23 +94,11 @@ public class InstanceManager { this.registeredHostsByConnection = new HashMap<ActorRef, Instance>(); this.deadHosts = new HashSet<ActorRef>(); this.heartbeatTimeout = heartbeatTimeout; - - new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval); } public long getHeartbeatTimeout() { return heartbeatTimeout; } - - /** - * This method is only used by the Flink YARN client to self-destruct a Flink cluster - * by stopping the JVMs of the TaskManagers. - */ - public void killTaskManagers() { - for (Instance i : this.registeredHostsById.values()) { - i.stopInstance(); - } - } public void shutdown() { synchronized (this.lock) { @@ -122,8 +107,6 @@ public class InstanceManager { } this.shutdown = true; - this.cleanupStaleMachines.cancel(); - for (Instance i : this.registeredHostsById.values()) { i.markDead(); } @@ -213,7 +196,7 @@ public class InstanceManager { if(host != null){ registeredHostsByConnection.remove(taskManager); - registeredHostsById.remove(taskManager); + registeredHostsById.remove(host.getId()); deadHosts.add(taskManager); host.markDead(); @@ -221,6 +204,10 @@ public class InstanceManager { totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots(); notifyDeadInstance(host); + + LOG.info("Unregistered task manager " + taskManager.path().address() + ". Number of " + + "registered task managers " + getNumberOfRegisteredTaskManagers() + ". Number" + + " of available slots " + getTotalNumberOfSlots() + "."); } } @@ -272,70 +259,10 @@ public class InstanceManager { for (InstanceListener listener : this.instanceListeners) { try { listener.instanceDied(instance); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Notification of dead instance failed.", t); } } } } - - // -------------------------------------------------------------------------------------------- - - private void checkForDeadInstances() { - final long now = System.currentTimeMillis(); - final long timeout = InstanceManager.this.heartbeatTimeout; - - synchronized (InstanceManager.this.lock) { - if (InstanceManager.this.shutdown) { - return; - } - - final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator(); - - // check all hosts whether they did not send heart-beat messages. - while (entries.hasNext()) { - - final Map.Entry<InstanceID, Instance> entry = entries.next(); - final Instance host = entry.getValue(); - - if (!host.isStillAlive(now, timeout)) { - - // remove from the living - entries.remove(); - registeredHostsByConnection.remove(host.getTaskManager()); - - // add to the dead - deadHosts.add(host.getTaskManager()); - - host.markDead(); - - totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots(); - - LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.", - host.getId(), host.getPath(), heartbeatTimeout, registeredHostsById.size())); - - // report to all listeners - notifyDeadInstance(host); - } - } - } - } - // -------------------------------------------------------------------------------------------- - - /** - * Periodic task that checks whether hosts have not sent their heart-beat - * messages and purges the hosts in this case. - */ - private final TimerTask cleanupStaleMachines = new TimerTask() { - @Override - public void run() { - try { - checkForDeadInstances(); - } - catch (Throwable t) { - LOG.error("Checking for dead instances failed.", t); - } - } - }; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java index 2cd929b..2795158 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java @@ -367,6 +367,9 @@ public class NetUtils { } break; case HEURISTIC: + LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" + + "isLinkLocalAddress:" + i.isLinkLocalAddress() +" " + + "isLoopbackAddress:" + i.isLoopbackAddress() + "."); if(!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){ LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " + "loopback address. Using instead " + i.getHostAddress() + " on network " + @@ -389,13 +392,16 @@ public class NetUtils { break; case SLOW_CONNECT: if(!InetAddress.getLocalHost().isLoopbackAddress()){ + LOG.info("Heuristically taking " + InetAddress.getLocalHost() + " as own " + + "IP address."); return InetAddress.getLocalHost(); }else { strategy = AddressDetectionState.HEURISTIC; break; } case HEURISTIC: - throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address: '"+jobManagerAddress+"')."); + throw new RuntimeException("Unable to resolve own inet address by connecting " + + "to address (" + jobManagerAddress + ")."); } if (LOG.isDebugEnabled()) { LOG.debug("Defaulting to detection strategy " + strategy); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index 5f520e3..9ea4a74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -1303,7 +1303,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i final TypeComparator<T> comparator = compFactory.createComparator(); oe = new OutputEmitter<T>(strategy, comparator, partitioner, dataDist); } - writers.add(new RecordWriter<SerializationDelegate<T>>(task, oe)); } if (eventualOutputs != null) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index f931497..168dccb 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -78,42 +78,105 @@ object AkkaUtils { val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL, ConfigConstants.DEFAULT_AKKA_LOG_LEVEL) - val configString = s"""akka.remote.transport-failure-detector.heartbeat-interval = - $transportHeartbeatInterval - |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $transportHeartbeatPause - |akka.remote.transport-failure-detector.threshold = $transportThreshold - |akka.remote.watch-failure-detector.heartbeat-interval = $watchHeartbeatInterval - |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $watchHeartbeatPause - |akka.remote.wathc-failure-detector.threshold = $watchThreshold - |akka.remote.netty.tcp.hostname = $host - |akka.remote.netty.tcp.port = $port - |akka.remote.netty.tcp.connection-timeout = $akkaTCPTimeout - |akka.remote.netty.tcp.maximum-frame-size = $akkaFramesize - |akka.actor.default-dispatcher.throughput = $akkaThroughput - |akka.remote.log-remote-lifecycle-events = $logLifecycleEvents - |akka.log-dead-letters = $logLifecycleEvents - |akka.log-dead-letters-during-shutdown = $logLifecycleEvents - |akka.loglevel = "$logLevel" - |akka.stdout-loglevel = "$logLevel" - """.stripMargin + val configString = + s""" + |akka { + | loglevel = "$logLevel" + | stdout-loglevel = "$logLevel" + | + | log-dead-letters = $logLifecycleEvents + | log-dead-letters-during-shutdown = $logLifecycleEvents + | + | extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$$"] + | + | remote { + | transport-failure-detector{ + | acceptable-heartbeat-pause = $transportHeartbeatPause + | threshold = $transportThreshold + | heartbeat-interval = $transportHeartbeatInterval + | } + | + | watch-failure-detector{ + | heartbeat-interval = $watchHeartbeatInterval + | acceptable-heartbeat-pause = $watchHeartbeatPause + | threshold = $watchThreshold + | } + | + | netty{ + | tcp{ + | hostname = $host + | port = $port + | connection-timeout = $akkaTCPTimeout + | maximum-frame-size = $akkaFramesize + | } + | } + | + | log-remote-lifecycle-events = $logLifecycleEvents + | + | } + | + | actor{ + | default-dispatcher{ + | throughput = $akkaThroughput + | } + | + | kryo{ + | type = "nograph" + | idstrategy = "default" + | serializer-pool-size = 16 + | buffer-size = 4096 + | max-buffer-size = -1 + | use-manifests = false + | compression = off + | implicit-registration-logging = true + | kryo-trace = true + | kryo-custom-serializer-init = "org.apache.flink.runtime.akka.KryoInitializer" + | } + | + | serialize-messages = on + | + | serializers{ + | kryo = "com.romix.akka.serialization.kryo.KryoSerializer" + | } + | + | serialization-bindings { + | } + | } + |} + """.stripMargin getDefaultActorSystemConfigString + configString } def getDefaultActorSystemConfigString: String = { - s"""akka.daemonic = on - |akka.loggers = ["akka.event.slf4j.Slf4jLogger"] - |akka.loglevel = "WARNING" - |akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" - |akka.stdout-loglevel = "WARNING" - |akka.jvm-exit-on-fatal-error = off - |akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" - |akka.remote.netty.tcp.tcp-nodelay = on - |akka.log-config-on-start = off - |akka.remote.netty.tcp.port = 0 - |akka.remote.netty.tcp.maximum-frame-size = 1MB - """.stripMargin + s""" + |akka { + | daemonic = on + | + | loggers = ["akka.event.slf4j.Slf4jLogger"] + | loglevel = "WARNING" + | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + | stdout-loglevel = "WARNING" + | jvm-exit-on-fatal-error = off + | log-config-on-start = off + | + | actor { + | provider = "akka.remote.RemoteActorRefProvider" + | } + | + | remote{ + | netty{ + | tcp{ + | transport-class = "akka.remote.transport.netty.NettyTransport" + | tcp-nodelay = on + | + | port = 0 + | maximum-frame-size = 1MB + | } + | } + | } + |} + """.stripMargin } def getDefaultActorSystemConfig = { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala new file mode 100644 index 0000000..5f9854b --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.akka + +import com.esotericsoftware.kryo.Kryo + +class KryoInitializer { + def cystomize(kryo: Kryo): Unit = { + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index a72c685..a18240e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -108,7 +108,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { hardwareInformation, numberOfSlots) // to be notified when the taskManager is no longer reachable - context.watch(taskManager); +// context.watch(taskManager); taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort) } @@ -381,7 +381,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { case Terminated(taskManager) => { log.info(s"Task manager ${taskManager.path} terminated.") instanceManager.unregisterTaskManager(taskManager) - context.unwatch(taskManager) +// context.unwatch(taskManager) } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a145689..261d50a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -79,7 +79,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka val REGISTRATION_DELAY = 0 seconds val REGISTRATION_INTERVAL = 10 seconds val MAX_REGISTRATION_ATTEMPTS = 10 - val HEARTBEAT_INTERVAL = 1000 millisecond + val HEARTBEAT_INTERVAL = 5000 millisecond TaskManager.checkTempDirs(tmpDirPaths) val ioManager = new IOManagerAsync(tmpDirPaths) @@ -185,7 +185,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka currentJobManager = sender() instanceID = id - context.watch(currentJobManager) +// context.watch(currentJobManager) log.info(s"TaskManager successfully registered at JobManager ${ currentJobManager.path @@ -232,9 +232,18 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka val taskIndex = tdd.getIndexInSubtaskGroup val numSubtasks = tdd.getCurrentNumberOfSubtasks var jarsRegistered = false + var startRegisteringTask = 0L try { + if(log.isDebugEnabled){ + startRegisteringTask = System.currentTimeMillis() + } libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles()); + + if(log.isDebugEnabled){ + log.debug(s"Register task ${executionID} took ${(System.currentTimeMillis() - + startRegisteringTask)/1000.0}s") + } jarsRegistered = true val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java index 1f63588..8a89503 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java @@ -235,83 +235,4 @@ public class InstanceManagerTest{ Assert.fail("Test erroneous: " + e.getMessage()); } } - - /** - * This test checks the clean-up routines of the cluster manager. - */ - @Test - public void testCleanUp() { - try { - InstanceManager cm = new InstanceManager(200, 100); - - HardwareDescription resources = HardwareDescription.extractFromSystem(4096); - InetAddress address = InetAddress.getByName("127.0.0.1"); - InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, 20000); - InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, 20001); - - JavaTestKit probe1 = new JavaTestKit(system); - JavaTestKit probe2 = new JavaTestKit(system); - // register two instances - InstanceID i1 = cm.registerTaskManager(probe1.getRef(), ici1, resources, 1); - InstanceID i2 = cm.registerTaskManager(probe2.getRef(), ici2, resources, 1); - - assertNotNull(i1); - assertNotNull(i2); - - assertEquals(2, cm.getNumberOfRegisteredTaskManagers()); - assertEquals(2, cm.getTotalNumberOfSlots()); - - // report a few heatbeats for both of the machines (each 50 msecs)... - for (int i = 0; i < 8; i++) { - CommonTestUtils.sleepUninterruptibly(50); - - assertTrue(cm.reportHeartBeat(i1)); - assertTrue(cm.reportHeartBeat(i2)); - } - - // all should be alive - assertEquals(2, cm.getNumberOfRegisteredTaskManagers()); - assertEquals(2, cm.getTotalNumberOfSlots()); - - // report a few heatbeats for both only one machine - for (int i = 0; i < 8; i++) { - CommonTestUtils.sleepUninterruptibly(50); - - assertTrue(cm.reportHeartBeat(i1)); - } - - // we should have lost one TM by now - assertEquals(1, cm.getNumberOfRegisteredTaskManagers()); - assertEquals(1, cm.getTotalNumberOfSlots()); - - // if the lost TM reports, it should not be accepted - assertFalse(cm.reportHeartBeat(i2)); - - // allow the lost TM to re-register itself - i2 = cm.registerTaskManager(probe2.getRef(), ici2, resources, 1); - assertEquals(2, cm.getNumberOfRegisteredTaskManagers()); - assertEquals(2, cm.getTotalNumberOfSlots()); - - // report a few heatbeats for both of the machines (each 50 msecs)... - for (int i = 0; i < 8; i++) { - CommonTestUtils.sleepUninterruptibly(50); - - assertTrue(cm.reportHeartBeat(i1)); - assertTrue(cm.reportHeartBeat(i2)); - } - - // all should be alive - assertEquals(2, cm.getNumberOfRegisteredTaskManagers()); - assertEquals(2, cm.getTotalNumberOfSlots()); - - - cm.shutdown(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - Assert.fail("Test erroneous: " + e.getMessage()); - } - } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java index 240cdac..9418d77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java @@ -38,14 +38,11 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.util.ExecutorThreadFactory; /** * Tests for the {@link Scheduler} when scheduling individual tasks. @@ -284,9 +281,6 @@ public class SchedulerIsolatedTasksTest { // the slots should all be different assertTrue(areAllDistinct(slotsAfter.toArray())); - executor.shutdown(); - executor.awaitTermination(30, TimeUnit.SECONDS); - assertEquals(totalSlots, scheduler.getNumberOfAvailableSlots()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala index a3c564a..9740c82 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java index 303ee3d..63ca29d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java @@ -121,8 +121,6 @@ public abstract class CancellingTestBase { actorSystem.scheduler().scheduleOnce(new FiniteDuration(msecsTillCanceling, TimeUnit.MILLISECONDS), client, new JobManagerMessages.CancelJob(jobGraph.getJobID()), actorSystem.dispatcher(), ActorRef.noSender()); - case RESTARTING: - throw new IllegalStateException("Job restarted"); try { Await.result(result, AkkaUtils.DEFAULT_TIMEOUT()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties index 0b686e5..2c2d4ff 100644 --- a/flink-tests/src/test/resources/log4j-test.properties +++ b/flink-tests/src/test/resources/log4j-test.properties @@ -17,7 +17,7 @@ ################################################################################ # Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=OFF, A1 +log4j.rootLogger=DEBUG, A1 # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala index 1e44413..a83d728 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala index a063957..0d6b763 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala index d09fe60..a34c7d8 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala index 1c6afba..bd254fe 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala index 7304310..9535173 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala index 17ecc3f..93e3593 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala index 8ffba8e..04a6285 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala index b5f266f..e5b6c5f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala index d4e438f..1dcf181 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala index 8cb49b8..d83d3be 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala index bcf1869..2a25be4 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,7 @@ package org.apache.flink.api.scala.operators.translation -import org.junit.Assert._ -import org.junit.Test import org.apache.flink.api.common.functions.Partitioner -import org.apache.flink.api.scala._ -import org.apache.flink.test.compiler.util.CompilerTestBase -import org.apache.flink.runtime.operators.shipping.ShipStrategyType -import org.apache.flink.compiler.plan.SingleInputPlanNode -import org.apache.flink.api.common.operators.Order -import org.apache.flink.api.common.InvalidProgramException -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint -import org.apache.flink.compiler.plan.DualInputPlanNode http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala index 24dbfe5..d150e85 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 58bf3db..6739948 100644 --- a/pom.xml +++ b/pom.xml @@ -282,6 +282,12 @@ under the License. </dependency> <dependency> + <groupId>com.github.romix.akka</groupId> + <artifactId>akka-kryo-serialization_2.10</artifactId> + <version>0.3.2</version> + </dependency> + + <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.10</artifactId> <version>2.2.2</version>
