Removed dead instance cleanup from InstanceManager so that Akka's watch 
mechanism is the current mean to detect dead instances.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8d414d7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8d414d7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8d414d7e

Branch: refs/heads/master
Commit: 8d414d7eb534bf5defa4aadd9b4f0a12a0be7ca8
Parents: 8eadd3e
Author: Till Rohrmann <[email protected]>
Authored: Mon Nov 17 18:33:45 2014 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Dec 18 18:58:31 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java |   1 +
 .../flink/configuration/ConfigConstants.java    |  10 +-
 flink-runtime/pom.xml                           |   6 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   3 +-
 .../apache/flink/runtime/instance/Instance.java |  27 +---
 .../flink/runtime/instance/InstanceManager.java |  85 +------------
 .../org/apache/flink/runtime/net/NetUtils.java  |   8 +-
 .../runtime/operators/RegularPactTask.java      |   1 -
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 125 ++++++++++++++-----
 .../flink/runtime/akka/KryoInitializer.scala    |  26 ++++
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  13 +-
 .../runtime/instance/InstanceManagerTest.java   |  79 ------------
 .../scheduler/SchedulerIsolatedTasksTest.java   |   6 -
 .../apache/flink/api/scala/ClosureCleaner.scala |   2 +-
 .../test/cancelling/CancellingTestBase.java     |   2 -
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../PartitionOperatorTranslationTest.scala      |   2 +-
 .../scala/functions/ClosureCleanerITCase.scala  |   2 +-
 .../misc/MassiveCaseClassSortingITCase.scala    |   2 +-
 .../CoGroupCustomPartitioningTest.scala         |   2 +-
 .../CoGroupGroupSortTranslationTest.scala       |   2 +-
 ...tomPartitioningGroupingKeySelectorTest.scala |   2 +-
 .../CustomPartitioningGroupingPojoTest.scala    |   2 +-
 .../CustomPartitioningGroupingTupleTest.scala   |   2 +-
 .../translation/CustomPartitioningTest.scala    |   2 +-
 .../JoinCustomPartitioningTest.scala            |   2 +-
 .../translation/PartitioningTestClasses.scala   |  12 +-
 .../scala/runtime/CaseClassComparatorTest.scala |   2 +-
 pom.xml                                         |   6 +
 30 files changed, 180 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 45848c2..c4e71ef 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -54,6 +54,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f0ab180..ab1ed78 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -578,17 +578,17 @@ public final class ConfigConstants {
        
        // ------------------------------ Akka Values 
------------------------------
 
-       public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 
ms";
+       public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "5000 
ms";
 
-       public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "10 s";
+       public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "100 s";
 
        public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;
 
-       public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "1000 ms";
+       public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms";
 
-       public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "10 s";
+       public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "100 s";
 
-       public static double DEFAULT_AKKA_WATCH_THRESHOLD = 10.0;
+       public static double DEFAULT_AKKA_WATCH_THRESHOLD = 300.0;
 
        public static String DEFAULT_AKKA_TCP_TIMEOUT = "15 s";
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index f04475c..b7edf7a 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -123,6 +123,11 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>com.github.romix.akka</groupId>
+                       <artifactId>akka-kryo-serialization_2.10</artifactId>
+               </dependency>
+
+               <dependency>
                        <groupId>org.scalatest</groupId>
                        <artifactId>scalatest_2.10</artifactId>
                </dependency>
@@ -289,7 +294,6 @@ under the License.
                                        <systemPropertyVariables>
                                                <log.level>WARN</log.level>
                                        </systemPropertyVariables>
-                                       <reuseForks>false</reuseForks>
                                </configuration>
                        </plugin>
                        <plugin>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8faa235..7d38eac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -58,7 +59,7 @@ import org.apache.flink.util.ExceptionUtils;
 import static akka.dispatch.Futures.future;
 
 
-public class ExecutionGraph {
+public class ExecutionGraph implements Serializable {
 
        private static final AtomicReferenceFieldUpdater<ExecutionGraph, 
JobStatus> STATE_UPDATER =
                        
AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, 
"state");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 18d3212..aaa276d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -116,32 +116,7 @@ public class Instance {
        public boolean isAlive() {
                return !isDead;
        }
-       
-       public void stopInstance() {
-               try {
-                       final TaskOperationProtocol tmProxy = 
this.getTaskManagerProxy();
-                       // start a thread for stopping the TM to avoid 
infinitive blocking.
-                       Runnable r = new Runnable() {
-                               @Override
-                               public void run() {
-                                       try {
-                                               tmProxy.killTaskManager();
-                                       } catch (IOException e) {
-                                               if (Log.isDebugEnabled()) {
-                                                       Log.debug("Error while 
stopping TaskManager", e);
-                                               }
-                                       }
-                               }
-                       };
-                       Thread t = new Thread(r);
-                       t.setDaemon(true); // do not prevent the JVM from 
stopping
-                       t.start();
-               } catch (Exception e) {
-                       if (Log.isDebugEnabled()) {
-                               Log.debug("Error while stopping TaskManager", 
e);
-                       }
-               }
-       }
+
        public void markDead() {
                if (isDead) {
                        return;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 10c89e4..3ce3ac7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -22,12 +22,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 
 import akka.actor.ActorRef;
 import org.apache.flink.configuration.ConfigConstants;
@@ -97,23 +94,11 @@ public class InstanceManager {
                this.registeredHostsByConnection = new HashMap<ActorRef, 
Instance>();
                this.deadHosts = new HashSet<ActorRef>();
                this.heartbeatTimeout = heartbeatTimeout;
-
-               new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, 
cleanupInterval);
        }
        
        public long getHeartbeatTimeout() {
                return heartbeatTimeout;
        }
-       
-       /**
-        * This method is only used by the Flink YARN client to self-destruct a 
Flink cluster
-        * by stopping the JVMs of the TaskManagers.
-        */
-       public void killTaskManagers() {
-               for (Instance i : this.registeredHostsById.values()) {
-                       i.stopInstance();
-               }
-       }
 
        public void shutdown() {
                synchronized (this.lock) {
@@ -122,8 +107,6 @@ public class InstanceManager {
                        }
                        this.shutdown = true;
 
-                       this.cleanupStaleMachines.cancel();
-
                        for (Instance i : this.registeredHostsById.values()) {
                                i.markDead();
                        }
@@ -213,7 +196,7 @@ public class InstanceManager {
 
                if(host != null){
                        registeredHostsByConnection.remove(taskManager);
-                       registeredHostsById.remove(taskManager);
+                       registeredHostsById.remove(host.getId());
                        deadHosts.add(taskManager);
 
                        host.markDead();
@@ -221,6 +204,10 @@ public class InstanceManager {
                        totalNumberOfAliveTaskSlots -= 
host.getTotalNumberOfSlots();
 
                        notifyDeadInstance(host);
+
+                       LOG.info("Unregistered task manager " + 
taskManager.path().address() + ". Number of " +
+                                       "registered task managers " + 
getNumberOfRegisteredTaskManagers() + ". Number" +
+                                       " of available slots " + 
getTotalNumberOfSlots() + ".");
                }
        }
 
@@ -272,70 +259,10 @@ public class InstanceManager {
                        for (InstanceListener listener : 
this.instanceListeners) {
                                try {
                                        listener.instanceDied(instance);
-                               }
-                               catch (Throwable t) {
+                               } catch (Throwable t) {
                                        LOG.error("Notification of dead 
instance failed.", t);
                                }
                        }
                }
        }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void checkForDeadInstances() {
-               final long now = System.currentTimeMillis();
-               final long timeout = InstanceManager.this.heartbeatTimeout;
-               
-               synchronized (InstanceManager.this.lock) {
-                       if (InstanceManager.this.shutdown) {
-                               return;
-                       }
-
-                       final Iterator<Map.Entry<InstanceID, Instance>> entries 
= registeredHostsById.entrySet().iterator();
-                       
-                       // check all hosts whether they did not send heart-beat 
messages.
-                       while (entries.hasNext()) {
-                               
-                               final Map.Entry<InstanceID, Instance> entry = 
entries.next();
-                               final Instance host = entry.getValue();
-                               
-                               if (!host.isStillAlive(now, timeout)) {
-                                       
-                                       // remove from the living
-                                       entries.remove();
-                                       
registeredHostsByConnection.remove(host.getTaskManager());
-
-                                       // add to the dead
-                                       deadHosts.add(host.getTaskManager());
-                                       
-                                       host.markDead();
-                                       
-                                       totalNumberOfAliveTaskSlots -= 
host.getTotalNumberOfSlots();
-                                       
-                                       LOG.info(String.format("TaskManager %s 
at %s did not report a heartbeat for %d msecs - marking as dead. Current number 
of registered hosts is %d.",
-                                                       host.getId(), 
host.getPath(), heartbeatTimeout, registeredHostsById.size()));
-                                       
-                                       // report to all listeners
-                                       notifyDeadInstance(host);
-                               }
-                       }
-               }
-       }
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Periodic task that checks whether hosts have not sent their 
heart-beat
-        * messages and purges the hosts in this case.
-        */
-       private final TimerTask cleanupStaleMachines = new TimerTask() {
-               @Override
-               public void run() {
-                       try {
-                               checkForDeadInstances();
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Checking for dead instances 
failed.", t);
-                       }
-               }
-       };
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 2cd929b..2795158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -367,6 +367,9 @@ public class NetUtils {
                                                        }
                                                        break;
                                                case HEURISTIC:
+                                                       
LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
+                                                                       
"isLinkLocalAddress:" + i.isLinkLocalAddress() +" " +
+                                                                       
"isLoopbackAddress:" + i.isLoopbackAddress() + ".");
                                                        
if(!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof 
Inet4Address){
                                                                
LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to 
" +
                                                                                
"loopback address. Using instead " + i.getHostAddress() + " on network " +
@@ -389,13 +392,16 @@ public class NetUtils {
                                        break;
                                case SLOW_CONNECT:
                                        
if(!InetAddress.getLocalHost().isLoopbackAddress()){
+                                               LOG.info("Heuristically taking 
" + InetAddress.getLocalHost() + " as own " +
+                                                               "IP address.");
                                                return 
InetAddress.getLocalHost();
                                        }else {
                                                strategy = 
AddressDetectionState.HEURISTIC;
                                                break;
                                        }
                                case HEURISTIC:
-                                       throw new RuntimeException("The 
TaskManager is unable to connect to the JobManager (Address: 
'"+jobManagerAddress+"').");
+                                       throw new RuntimeException("Unable to 
resolve own inet address by connecting " +
+                                                       "to address (" + 
jobManagerAddress + ").");
                        }
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("Defaulting to detection strategy " + 
strategy);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 5f520e3..9ea4a74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -1303,7 +1303,6 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                        final TypeComparator<T> comparator = 
compFactory.createComparator();
                                        oe = new OutputEmitter<T>(strategy, 
comparator, partitioner, dataDist);
                                }
-
                                writers.add(new 
RecordWriter<SerializationDelegate<T>>(task, oe));
                        }
                        if (eventualOutputs != null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index f931497..168dccb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -78,42 +78,105 @@ object AkkaUtils {
     val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL,
       ConfigConstants.DEFAULT_AKKA_LOG_LEVEL)
 
-    val configString = 
s"""akka.remote.transport-failure-detector.heartbeat-interval =
-                       $transportHeartbeatInterval
-       |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 
$transportHeartbeatPause
-       |akka.remote.transport-failure-detector.threshold = $transportThreshold
-       |akka.remote.watch-failure-detector.heartbeat-interval = 
$watchHeartbeatInterval
-       |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 
$watchHeartbeatPause
-       |akka.remote.wathc-failure-detector.threshold = $watchThreshold
-       |akka.remote.netty.tcp.hostname = $host
-       |akka.remote.netty.tcp.port = $port
-       |akka.remote.netty.tcp.connection-timeout = $akkaTCPTimeout
-       |akka.remote.netty.tcp.maximum-frame-size = $akkaFramesize
-       |akka.actor.default-dispatcher.throughput = $akkaThroughput
-       |akka.remote.log-remote-lifecycle-events = $logLifecycleEvents
-       |akka.log-dead-letters = $logLifecycleEvents
-       |akka.log-dead-letters-during-shutdown = $logLifecycleEvents
-       |akka.loglevel = "$logLevel"
-       |akka.stdout-loglevel = "$logLevel"
-     """.stripMargin
+    val configString =
+      s"""
+         |akka {
+         |  loglevel = "$logLevel"
+         |  stdout-loglevel = "$logLevel"
+         |
+         |  log-dead-letters = $logLifecycleEvents
+         |  log-dead-letters-during-shutdown = $logLifecycleEvents
+         |
+         |  extensions = 
["com.romix.akka.serialization.kryo.KryoSerializationExtension$$"]
+         |
+         |  remote {
+         |    transport-failure-detector{
+         |      acceptable-heartbeat-pause = $transportHeartbeatPause
+         |      threshold = $transportThreshold
+         |      heartbeat-interval = $transportHeartbeatInterval
+         |    }
+         |
+         |    watch-failure-detector{
+         |      heartbeat-interval = $watchHeartbeatInterval
+         |      acceptable-heartbeat-pause = $watchHeartbeatPause
+         |      threshold = $watchThreshold
+         |    }
+         |
+         |    netty{
+         |      tcp{
+         |        hostname = $host
+         |        port = $port
+         |        connection-timeout = $akkaTCPTimeout
+         |        maximum-frame-size = $akkaFramesize
+         |      }
+         |    }
+         |
+         |    log-remote-lifecycle-events = $logLifecycleEvents
+         |
+         |  }
+         |
+         |  actor{
+         |    default-dispatcher{
+         |      throughput = $akkaThroughput
+         |    }
+         |
+         |    kryo{
+         |      type = "nograph"
+         |      idstrategy = "default"
+         |      serializer-pool-size = 16
+         |      buffer-size = 4096
+         |      max-buffer-size = -1
+         |      use-manifests = false
+         |      compression = off
+         |      implicit-registration-logging = true
+         |      kryo-trace = true
+         |      kryo-custom-serializer-init = 
"org.apache.flink.runtime.akka.KryoInitializer"
+         |    }
+         |
+         |    serialize-messages = on
+         |
+         |    serializers{
+         |      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
+         |    }
+         |
+         |    serialization-bindings {
+         |    }
+         |  }
+         |}
+       """.stripMargin
 
     getDefaultActorSystemConfigString + configString
   }
 
   def getDefaultActorSystemConfigString: String = {
-    s"""akka.daemonic = on
-      |akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
-      |akka.loglevel = "WARNING"
-      |akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-      |akka.stdout-loglevel = "WARNING"
-      |akka.jvm-exit-on-fatal-error = off
-      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-      |akka.remote.netty.tcp.transport-class = 
"akka.remote.transport.netty.NettyTransport"
-      |akka.remote.netty.tcp.tcp-nodelay = on
-      |akka.log-config-on-start = off
-      |akka.remote.netty.tcp.port = 0
-      |akka.remote.netty.tcp.maximum-frame-size = 1MB
-    """.stripMargin
+    s"""
+       |akka {
+       |  daemonic = on
+       |
+       |  loggers = ["akka.event.slf4j.Slf4jLogger"]
+       |  loglevel = "WARNING"
+       |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+       |  stdout-loglevel = "WARNING"
+       |  jvm-exit-on-fatal-error = off
+       |  log-config-on-start = off
+       |
+       |  actor {
+       |    provider = "akka.remote.RemoteActorRefProvider"
+       |  }
+       |
+       |  remote{
+       |    netty{
+       |      tcp{
+       |        transport-class = "akka.remote.transport.netty.NettyTransport"
+       |        tcp-nodelay = on
+       |
+       |        port = 0
+       |        maximum-frame-size = 1MB
+       |      }
+       |    }
+       |  }
+       |}
+     """.stripMargin
   }
 
   def getDefaultActorSystemConfig = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
new file mode 100644
index 0000000..5f9854b
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka
+
+import com.esotericsoftware.kryo.Kryo
+
+class KryoInitializer {
+  def cystomize(kryo: Kryo): Unit = {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a72c685..a18240e 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -108,7 +108,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
         hardwareInformation, numberOfSlots)
 
       // to be notified when the taskManager is no longer reachable
-      context.watch(taskManager);
+//      context.watch(taskManager);
 
       taskManager ! AcknowledgeRegistration(instanceID, 
libraryCacheManager.getBlobServerPort)
     }
@@ -381,7 +381,7 @@ Actor with ActorLogMessages with ActorLogging with 
WrapAsScala {
     case Terminated(taskManager) => {
       log.info(s"Task manager ${taskManager.path} terminated.")
       instanceManager.unregisterTaskManager(taskManager)
-      context.unwatch(taskManager)
+//      context.unwatch(taskManager)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a145689..261d50a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -79,7 +79,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, 
val jobManagerAkka
   val REGISTRATION_DELAY = 0 seconds
   val REGISTRATION_INTERVAL = 10 seconds
   val MAX_REGISTRATION_ATTEMPTS = 10
-  val HEARTBEAT_INTERVAL = 1000 millisecond
+  val HEARTBEAT_INTERVAL = 5000 millisecond
 
   TaskManager.checkTempDirs(tmpDirPaths)
   val ioManager = new IOManagerAsync(tmpDirPaths)
@@ -185,7 +185,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
         currentJobManager = sender()
         instanceID = id
 
-        context.watch(currentJobManager)
+//        context.watch(currentJobManager)
 
         log.info(s"TaskManager successfully registered at JobManager ${
           currentJobManager.path
@@ -232,9 +232,18 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
       val taskIndex = tdd.getIndexInSubtaskGroup
       val numSubtasks = tdd.getCurrentNumberOfSubtasks
       var jarsRegistered = false
+      var startRegisteringTask = 0L
 
       try {
+        if(log.isDebugEnabled){
+          startRegisteringTask = System.currentTimeMillis()
+        }
         libraryCacheManager.registerTask(jobID, executionID, 
tdd.getRequiredJarFiles());
+
+        if(log.isDebugEnabled){
+          log.debug(s"Register task ${executionID} took 
${(System.currentTimeMillis() -
+            startRegisteringTask)/1000.0}s")
+        }
         jarsRegistered = true
 
         val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 1f63588..8a89503 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -235,83 +235,4 @@ public class InstanceManagerTest{
                        Assert.fail("Test erroneous: " + e.getMessage());
                }
        }
-
-       /**
-        * This test checks the clean-up routines of the cluster manager.
-        */
-       @Test
-       public void testCleanUp() {
-               try {
-                       InstanceManager cm = new InstanceManager(200, 100);
-
-                       HardwareDescription resources = 
HardwareDescription.extractFromSystem(4096);
-                       InetAddress address = 
InetAddress.getByName("127.0.0.1");
-                       InstanceConnectionInfo ici1 = new 
InstanceConnectionInfo(address, 20000);
-                       InstanceConnectionInfo ici2 = new 
InstanceConnectionInfo(address, 20001);
-
-                       JavaTestKit probe1 = new JavaTestKit(system);
-                       JavaTestKit probe2 = new JavaTestKit(system);
-                       // register two instances
-                       InstanceID i1 = cm.registerTaskManager(probe1.getRef(), 
ici1, resources, 1);
-                       InstanceID i2 = cm.registerTaskManager(probe2.getRef(), 
ici2, resources, 1);
-
-                       assertNotNull(i1);
-                       assertNotNull(i2);
-                       
-                       assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-                       assertEquals(2, cm.getTotalNumberOfSlots());
-
-                       // report a few heatbeats for both of the machines 
(each 50 msecs)...
-                       for (int i = 0; i < 8; i++) {
-                               CommonTestUtils.sleepUninterruptibly(50);
-                               
-                               assertTrue(cm.reportHeartBeat(i1));
-                               assertTrue(cm.reportHeartBeat(i2));
-                       }
-                       
-                       // all should be alive
-                       assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-                       assertEquals(2, cm.getTotalNumberOfSlots());
-
-                       // report a few heatbeats for both only one machine
-                       for (int i = 0; i < 8; i++) {
-                               CommonTestUtils.sleepUninterruptibly(50);
-                               
-                               assertTrue(cm.reportHeartBeat(i1));
-                       }
-                       
-                       // we should have lost one TM by now
-                       assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
-                       assertEquals(1, cm.getTotalNumberOfSlots());
-                       
-                       // if the lost TM reports, it should not be accepted
-                       assertFalse(cm.reportHeartBeat(i2));
-                       
-                       // allow the lost TM to re-register itself
-                       i2 = cm.registerTaskManager(probe2.getRef(), ici2, 
resources, 1);
-                       assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-                       assertEquals(2, cm.getTotalNumberOfSlots());
-                       
-                       // report a few heatbeats for both of the machines 
(each 50 msecs)...
-                       for (int i = 0; i < 8; i++) {
-                               CommonTestUtils.sleepUninterruptibly(50);
-                               
-                               assertTrue(cm.reportHeartBeat(i1));
-                               assertTrue(cm.reportHeartBeat(i2));
-                       }
-                       
-                       // all should be alive
-                       assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-                       assertEquals(2, cm.getTotalNumberOfSlots());
-
-                       
-                       cm.shutdown();
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       Assert.fail("Test erroneous: " + e.getMessage());
-               }
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 240cdac..9418d77 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -38,14 +38,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
 
 /**
  * Tests for the {@link Scheduler} when scheduling individual tasks.
@@ -284,9 +281,6 @@ public class SchedulerIsolatedTasksTest {
                        // the slots should all be different
                        assertTrue(areAllDistinct(slotsAfter.toArray()));
                        
-                       executor.shutdown();
-                       executor.awaitTermination(30, TimeUnit.SECONDS);
-                       
                        assertEquals(totalSlots, 
scheduler.getNumberOfAvailableSlots());
                }
                catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
index a3c564a..9740c82 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 303ee3d..63ca29d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -121,8 +121,6 @@ public abstract class CancellingTestBase {
                        actorSystem.scheduler().scheduleOnce(new 
FiniteDuration(msecsTillCanceling,
                                                        TimeUnit.MILLISECONDS), 
client, new JobManagerMessages.CancelJob(jobGraph.getJobID()),
                                        actorSystem.dispatcher(), 
ActorRef.noSender());
-                                               case RESTARTING:
-                                                       throw new 
IllegalStateException("Job restarted");
 
                        try {
                                Await.result(result, 
AkkaUtils.DEFAULT_TIMEOUT());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties 
b/flink-tests/src/test/resources/log4j-test.properties
index 0b686e5..2c2d4ff 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -17,7 +17,7 @@
 
################################################################################
 
 # Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
+log4j.rootLogger=DEBUG, A1
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.A1=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
index 1e44413..a83d728 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index a063957..0d6b763 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
index d09fe60..a34c7d8 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
index 1c6afba..bd254fe 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
index 7304310..9535173 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
index 17ecc3f..93e3593 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
index 8ffba8e..04a6285 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
index b5f266f..e5b6c5f 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
index d4e438f..1dcf181 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
index 8cb49b8..d83d3be 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
index bcf1869..2a25be4 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
-import org.junit.Assert._
-import org.junit.Test
 import org.apache.flink.api.common.functions.Partitioner
-import org.apache.flink.api.scala._
-import org.apache.flink.test.compiler.util.CompilerTestBase
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType
-import org.apache.flink.compiler.plan.SingleInputPlanNode
-import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.compiler.plan.DualInputPlanNode
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
index 24dbfe5..d150e85 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 58bf3db..6739948 100644
--- a/pom.xml
+++ b/pom.xml
@@ -282,6 +282,12 @@ under the License.
                        </dependency>
 
                        <dependency>
+                               <groupId>com.github.romix.akka</groupId>
+                               
<artifactId>akka-kryo-serialization_2.10</artifactId>
+                               <version>0.3.2</version>
+                       </dependency>
+
+                       <dependency>
                                <groupId>org.scalatest</groupId>
                                <artifactId>scalatest_2.10</artifactId>
                                <version>2.2.2</version>

Reply via email to