Remove experimental KryoSerialization for Akka

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ae8fb948
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ae8fb948
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ae8fb948

Branch: refs/heads/master
Commit: ae8fb948281c5b9ba6af8145bad21395c22a3e1a
Parents: b962243
Author: Till Rohrmann <[email protected]>
Authored: Wed Dec 17 18:59:52 2014 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Dec 18 18:58:33 2014 +0100

----------------------------------------------------------------------
 .../mapreduce/HadoopOutputFormat.java           |  47 ++--
 flink-runtime/pom.xml                           |   5 -
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 177 ---------------
 .../flink/runtime/akka/KryoInitializer.scala    | 221 -------------------
 .../testingUtils/KryoTestingInitializer.scala   |  46 ----
 pom.xml                                         |   6 -
 6 files changed, 22 insertions(+), 480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
index cce7695..326f8ef 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -138,19 +138,18 @@ public class HadoopOutputFormat<K extends Writable,V 
extends Writable> implement
                }
 
                File dir = new 
File(this.configuration.get("mapred.output.dir"));
-               if(dir.exists()){
-                       if(dir.isDirectory()){
-                               File[] files = dir.listFiles();
-                               
System.out.println(configuration.get("mapred.output.dir") + " contains the " +
-                                               "following files.");
-                               for(File file: files){
-                                       System.out.println(file.toPath());
-                               }
-                       }else{
-                               
System.out.println(configuration.get("mapred.output.dir") + " is not a 
directory.");
+
+               if(dir.isDirectory()){
+                       File[] files = dir.listFiles();
+                       
System.out.println(configuration.get("mapred.output.dir") + " contains the " +
+                                       "following files.");
+                       for(File file: files){
+                               System.out.println(file.toURI());
                        }
+               }else if(dir.exists()){
+                       
System.out.println(configuration.get("mapred.output.dir") + " is not a 
directory.");
                }else{
-                       
System.out.println(configuration.get("mapred.output.dir") + " does not exist 
yet.");
+                       
System.out.println(configuration.get("mapred.output.dir") + " does not yet 
exists.");
                }
        }
        
@@ -184,23 +183,20 @@ public class HadoopOutputFormat<K extends Writable,V 
extends Writable> implement
                Path outputPath = new 
Path(this.configuration.get("mapred.output.dir"));
 
                File dir = new 
File(this.configuration.get("mapred.output.dir"));
-               if(dir.exists()){
-                       if(dir.isDirectory()){
-                               File[] files = dir.listFiles();
-                               System.out.println("Close: " 
+configuration.get("mapred.output.dir") + " contains" +
-                                                               " the " +
-                                               "following files.");
-                               for(File file: files){
-                                       System.out.println(file.toPath());
-                               }
-                       }else{
-                               System.out.println("Close: " 
+configuration.get("mapred.output.dir") + " is not a" +
-                                               " directory.");
+
+               if(dir.isDirectory()){
+                       File[] files = dir.listFiles();
+                       
System.out.println(configuration.get("mapred.output.dir") + " contains the " +
+                                       "following files.");
+                       for(File file: files){
+                               System.out.println(file.toURI());
                        }
+               }else if(dir.exists()){
+                       
System.out.println(configuration.get("mapred.output.dir") + " is not a 
directory.");
                }else{
-                       System.out.println("Close: " 
+configuration.get("mapred.output.dir") + " does not " +
-                                       "exist yet)).");
+                       
System.out.println(configuration.get("mapred.output.dir") + " does not yet 
exists.");
                }
+
                
                // rename tmp-file to final name
                FileSystem fs = FileSystem.get(outputPath.toUri(), 
this.configuration);
@@ -221,6 +217,7 @@ public class HadoopOutputFormat<K extends Writable,V 
extends Writable> implement
        @Override
        public void finalizeGlobal(int parallelism) throws IOException {
 
+               System.out.println("Finalize HadoopOutputFormat.");
                JobContext jobContext;
                TaskAttemptContext taskContext;
                try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index b7edf7a..dee36f8 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -123,11 +123,6 @@ under the License.
                </dependency>
 
                <dependency>
-                       <groupId>com.github.romix.akka</groupId>
-                       <artifactId>akka-kryo-serialization_2.10</artifactId>
-               </dependency>
-
-               <dependency>
                        <groupId>org.scalatest</groupId>
                        <artifactId>scalatest_2.10</artifactId>
                </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 5f6d59a..8e12443 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -214,183 +214,6 @@ object AkkaUtils {
     """.stripMargin
   }
 
-  // scalastyle:off line.size.limit
-
-  def getKryoSerializerString: String = {
-    """
-      |akka {
-      |
-      |  extensions = 
["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
-      |
-      |  actor{
-      |    kryo{
-      |      type = "graph"
-      |      idstrategy = "incremental"
-      |      serializer-pool-size = 16
-      |      buffer-size = 4096
-      |      max-buffer-size = -1
-      |      use-manifests = false
-      |      compression = off
-      |      implicit-registration-logging = true
-      |      kryo-trace = false
-      |      kryo-custom-serializer-init = 
"org.apache.flink.runtime.akka.KryoInitializer"
-      |    }
-      |
-      |
-      |    serializers{
-      |      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
-      |      java = "akka.serialization.JavaSerializer"
-      |    }
-      |
-      |    serialization-bindings {
-      |      "java.io.Serializable" = java
-      |
-      |      "java.lang.Throwable" = java
-      |      "akka.event.Logging$Error" = java
-      |      "java.lang.Integer" = kryo
-      |      "java.lang.Long" = kryo
-      |      "java.lang.Float" = kryo
-      |      "java.lang.Double" = kryo
-      |      "java.lang.Boolean" = kryo
-      |      "java.lang.Short" = kryo
-      |
-      |      "scala.Tuple2" = kryo
-      |      "scala.Tuple3" = kryo
-      |      "scala.Tuple4" = kryo
-      |      "scala.Tuple5" = kryo
-      |      "scala.Tuple6" = kryo
-      |      "scala.Tuple7" = kryo
-      |      "scala.Tuple8" = kryo
-      |      "scala.Tuple9" = kryo
-      |      "scala.Tuple10" = kryo
-      |      "scala.Tuple11" = kryo
-      |      "scala.Tuple12" = kryo
-      |      "scala.collection.BitSet" = kryo
-      |      "scala.collection.SortedSet" = kryo
-      |      "scala.util.Left" = kryo
-      |      "scala.util.Right" = kryo
-      |      "scala.collection.SortedMap" = kryo
-      |      "scala.Int" = kryo
-      |      "scala.Long" = kryo
-      |      "scala.Float" = kryo
-      |      "scala.Double" = kryo
-      |      "scala.Boolean" = kryo
-      |      "scala.Short" = kryo
-      |      "java.lang.String" = kryo
-      |      "scala.Option" = kryo
-      |      "scala.collection.immutable.Map" = kryo
-      |      "scala.collection.Traversable" = kryo
-      |      "scala.runtime.BoxedUnit" = kryo
-      |
-      |      "akka.actor.SystemGuardian$RegisterTerminationHook$" = kryo
-      |      "akka.actor.Address" = kryo
-      |      "akka.actor.Terminated" = kryo
-      |      "akka.actor.LocalActorRef" = kryo
-      |      "akka.actor.RepointableActorRef" = kryo
-      |      "akka.actor.Identify" = kryo
-      |      "akka.actor.ActorIdentity" = kryo
-      |      "akka.actor.PoisonPill$" = kryo
-      |      "akka.actor.SystemGuardian$TerminationHook$" = kryo
-      |      "akka.actor.SystemGuardian$TerminationHookDone$" = kryo
-      |      "akka.actor.AddressTerminated" = kryo
-      |      "akka.actor.Status$Failure" = kryo
-      |      "akka.remote.RemoteWatcher$ReapUnreachableTick$" = kryo
-      |      "akka.remote.RemoteWatcher$HeartbeatTick$" = kryo
-      |      "akka.remote.ReliableDeliverySupervisor$GotUid" = kryo
-      |      "akka.remote.EndpointWriter$AckIdleCheckTimer$" = kryo
-      |      "akka.remote.EndpointWriter$StoppedReading" = kryo
-      |      "akka.remote.ReliableDeliverySupervisor$Ungate$" = kryo
-      |      "akka.remote.EndpointWriter$StopReading" = kryo
-      |      "akka.remote.EndpointWriter$OutboundAck" = kryo
-      |      "akka.remote.Ack" = kryo
-      |      "akka.remote.SeqNo" = kryo
-      |      "akka.remote.EndpointWriter$FlushAndStop$" = kryo
-      |      "akka.remote.ReliableDeliverySupervisor$AttemptSysMsgRedelivery$" 
= kryo
-      |      "akka.remote.RemoteWatcher$WatchRemote" = kryo
-      |      "akka.remote.RemoteWatcher$UnwatchRemote" = kryo
-      |      "akka.remote.RemoteWatcher$RewatchRemote" = kryo
-      |      "akka.remote.RemoteWatcher$Rewatch" = kryo
-      |      "akka.remote.RemoteWatcher$Heartbeat$" = kryo
-      |      "akka.remote.RemoteWatcher$HeartbeatRsp" = kryo
-      |      "akka.remote.EndpointWriter$FlushAndStopTimeout$" = kryo
-      |      "akka.remote.RemoteWatcher$ExpectedFirstHeartbeat" = kryo
-      |      "akka.remote.transport.Transport$InvalidAssociationException" = 
kryo
-      |      "akka.dispatch.sysmsg.Terminate" = kryo
-      |      "akka.dispatch.sysmsg.Unwatch" = kryo
-      |      "akka.dispatch.sysmsg.Watch" = kryo
-      |      "akka.dispatch.sysmsg.DeathWatchNotification" = kryo
-      |
-      |      
"org.apache.flink.runtime.messages.ArchiveMessages$ArchiveExecutionGraph" = kryo
-      |      "org.apache.flink.runtime.messages.ArchiveMessages$ArchivedJobs" 
= kryo
-      |
-      |      
"org.apache.flink.runtime.messages.ExecutionGraphMessages$ExecutionStateChanged"
 = kryo
-      |      
"org.apache.flink.runtime.messages.ExecutionGraphMessages$JobStatusChanged" = 
kryo
-      |
-      |      
"org.apache.flink.runtime.messages.JobClientMessages$SubmitJobAndWait" = kryo
-      |      
"org.apache.flink.runtime.messages.JobClientMessages$SubmitJobDetached" = kryo
-      |
-      |      "org.apache.flink.runtime.messages.JobManagerMessages$SubmitJob" 
= kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$SubmissionSuccess" = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$SubmissionFailure" = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$CancellationSuccess" = 
kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$CancellationFailure" = 
kryo
-      |      "org.apache.flink.runtime.messages.JobManagerMessages$CancelJob" 
= kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$UpdateTaskExecutionState" 
= kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RequestNextInputSplit" = 
kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$LookupConnectionInformation"
 = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$ConnectionInformation" = 
kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$ReportAccumulatorResult" 
= kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RequestAccumulatorResults"
 = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$AccumulatorResultsFound" 
= kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$AccumulatorResultsNotFound"
 = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RequestJobStatus" = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$CurrentJobStatus" = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RequestNumberRegisteredTaskManager$"
 = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RequestTotalNumberOfSlots$"
 = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RequestBlobManagerPort$" 
= kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RequestFinalJobStatus" = 
kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$JobResultSuccess" = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$JobResultCanceled" = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailed" = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobs$" = 
kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RunningJobs" = kryo
-      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestJob" 
= kryo
-      |      "org.apache.flink.runtime.messages.JobManagerMessages$JobFound" = 
kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$JobNotFound" = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RequestRegisteredTaskManagers$"
 = kryo
-      |      
"org.apache.flink.runtime.messages.JobManagerMessages$RegisteredTaskManagers" = 
kryo
-      |
-      |      
"org.apache.flink.runtime.messages.JobManagerProfilerMessages$ReportProfilingData"
 = kryo
-      |
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$NotifyWhenRegisteredAtJobManager$"
 = kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$RegisterAtJobManager$" = 
kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$CancelTask" = kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$SubmitTask" = kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$NextInputSplit" = kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$UnregisterTask" = kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$TaskOperationResult" = 
kryo
-      |      "org.apache.flink.runtime.messages.TaskManagerMessages$Heartbeat" 
= kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$RegisteredAtJobManager$" 
= kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$RegisterAtJobManager$" = 
kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$SendHeartbeat$" = kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerMessages$LogMemoryUsage$" = kryo
-      |
-      |      
"org.apache.flink.runtime.messages.TaskManagerProfilerMessages$MonitorTask" = 
kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerProfilerMessages$UnmonitorTask" = 
kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerProfilerMessages$RegisterProfilingListener$"
 = kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerProfilerMessages$UnregisterProfilingListener$"
 = kryo
-      |      
"org.apache.flink.runtime.messages.TaskManagerProfilerMessages$ProfileTasks$" = 
kryo
-      |
-      |      
"org.apache.flink.runtime.messages.RegistrationMessages$RegisterTaskManager" = 
kryo
-      |      
"org.apache.flink.runtime.messages.RegistrationMessages$AcknowledgeRegistration"
 = kryo
-      |    }
-      |  }
-      |}
-    """.stripMargin
-  }
-
-  // scalastyle:on line.size.limit
-
   def getDefaultActorSystemConfig = {
     ConfigFactory.parseString(getDefaultActorSystemConfigString)
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
deleted file mode 100644
index 2a492fd..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.akka
-
-import java.net.Inet4Address
-
-import com.esotericsoftware.kryo.serializers.JavaSerializer
-import com.esotericsoftware.kryo.{Serializer, Kryo}
-import org.apache.flink.api.common.accumulators.Accumulator
-import org.apache.flink.core.fs.FileInputSplit
-import org.apache.flink.core.io.{LocatableInputSplit, GenericInputSplit}
-import org.apache.flink.runtime.accumulators.AccumulatorEvent
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph}
-import org.apache.flink.runtime.instance.{Instance, InstanceID, 
HardwareDescription,
-InstanceConnectionInfo}
-import org.apache.flink.runtime.io.network.{RemoteReceiver, 
ConnectionInfoLookupResponse}
-import org.apache.flink.runtime.io.network.channels.ChannelID
-import org.apache.flink.runtime.jobgraph.{JobVertexID, JobStatus, JobID, 
JobGraph}
-import org.apache.flink.runtime.messages.ArchiveMessages.{ArchivedJobs, 
RequestArchivedJobs,
-ArchiveExecutionGraph}
-import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.{ExecutionStateChanged,
-JobStatusChanged}
-import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, 
SubmitJobAndWait}
-import org.apache.flink.runtime.messages.JobManagerMessages._
-import 
org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData
-import 
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
-RegisterTaskManager}
-import org.apache.flink.runtime.messages.TaskManagerMessages._
-import org.apache.flink.runtime.messages.TaskManagerProfilerMessages._
-import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer
-import org.apache.flink.runtime.taskmanager.{Task, TaskExecutionState}
-
-class KryoInitializer {
-  def customize(kryo: Kryo): Unit = {
-
-    register(kryo)
-  }
-
-  def register(kryo: Kryo): Unit = {
-    def register(className: String): Unit = {
-      kryo.register(Class.forName(className))
-    }
-
-    def registerClass(clazz: Class[_], serializer: Serializer[_] = null): Unit 
= {
-      if(serializer != null){
-        kryo.register(clazz, serializer)
-      }else {
-        kryo.register(clazz)
-      }
-    }
-
-    register("scala.Some")
-    register("scala.None$")
-    register("scala.collection.immutable.Set$EmptySet$")
-    register("scala.runtime.BoxedUnit")
-
-    register("akka.actor.SystemGuardian$RegisterTerminationHook$")
-    register("akka.actor.Address")
-    register("akka.actor.Terminated")
-    register("akka.actor.LocalActorRef")
-    register("akka.actor.RepointableActorRef")
-    register("akka.actor.Identify")
-    register("akka.actor.ActorIdentity")
-    register("akka.actor.PoisonPill$")
-    register("akka.actor.AddressTerminated")
-    register("akka.actor.Status$Failure")
-    register("akka.remote.RemoteWatcher$ReapUnreachableTick$")
-    register("akka.remote.RemoteWatcher$HeartbeatTick$")
-    register("akka.remote.ReliableDeliverySupervisor$GotUid")
-    register("akka.remote.EndpointWriter$AckIdleCheckTimer$")
-    register("akka.remote.EndpointWriter$StoppedReading")
-    register("akka.remote.ReliableDeliverySupervisor$Ungate$")
-    register("akka.remote.EndpointWriter$StopReading")
-    register("akka.remote.EndpointWriter$OutboundAck")
-    register("akka.remote.Ack")
-    register("akka.remote.SeqNo")
-    register("akka.remote.RemoteWatcher$HeartbeatRsp")
-    register("akka.actor.SystemGuardian$TerminationHook$")
-    register("akka.actor.SystemGuardian$TerminationHookDone$")
-    register("akka.remote.EndpointWriter$FlushAndStop$")
-    register("akka.remote.RemoteWatcher$WatchRemote")
-    register("akka.remote.RemoteWatcher$UnwatchRemote")
-    register("akka.remote.RemoteWatcher$Rewatch")
-    register("akka.remote.RemoteWatcher$RewatchRemote")
-    register("akka.remote.ReliableDeliverySupervisor$AttemptSysMsgRedelivery$")
-    register("akka.remote.RemoteActorRef")
-    register("akka.remote.RemoteWatcher$Heartbeat$")
-    register("akka.remote.EndpointWriter$FlushAndStopTimeout$")
-    register("akka.remote.RemoteWatcher$ExpectedFirstHeartbeat")
-    register("akka.remote.transport.Transport$InvalidAssociationException")
-    register("akka.remote.transport.AkkaProtocolException")
-    register("akka.dispatch.sysmsg.Terminate")
-    register("akka.dispatch.sysmsg.Unwatch")
-    register("akka.dispatch.sysmsg.Watch")
-    register("akka.dispatch.sysmsg.DeathWatchNotification")
-
-//    register("java.util.Collections$UnmodifiableRandomAccessList")
-
-
-    //Register Flink messages
-
-    kryo.setDefaultSerializer(classOf[JavaSerializer])
-
-    //misc types
-    registerClass(classOf[JobID])
-    registerClass(classOf[JobVertexID])
-    registerClass(classOf[ExecutionAttemptID])
-    registerClass(classOf[InstanceID])
-    registerClass(classOf[ExecutionState])
-    registerClass(classOf[JobStatus])
-    registerClass(classOf[TaskExecutionState])
-    registerClass(classOf[InstanceConnectionInfo])
-    registerClass(classOf[HardwareDescription])
-    registerClass(classOf[Inet4Address])
-    registerClass(classOf[ChannelID])
-    registerClass(classOf[ConnectionInfoLookupResponse])
-    registerClass(classOf[RemoteReceiver])
-    registerClass(classOf[AccumulatorEvent], new JavaSerializer)
-    registerClass(classOf[Instance], new JavaSerializer())
-    registerClass(classOf[JobGraph], new JavaSerializer())
-    registerClass(classOf[TaskDeploymentDescriptor], new JavaSerializer())
-    registerClass(classOf[ExecutionGraph], new JavaSerializer())
-    registerClass(classOf[ProfilingDataContainer], new JavaSerializer)
-    registerClass(classOf[Task], new JavaSerializer)
-    registerClass(classOf[GenericInputSplit], new JavaSerializer)
-    registerClass(classOf[LocatableInputSplit], new JavaSerializer)
-    registerClass(classOf[FileInputSplit], new JavaSerializer)
-    registerClass(classOf[StackTraceElement])
-    registerClass(classOf[Array[StackTraceElement]])
-
-    //Archive messages
-    registerClass(classOf[ArchiveExecutionGraph])
-    registerClass(RequestArchivedJobs.getClass)
-    registerClass(classOf[ArchivedJobs])
-
-    //ExecutionGraph messages
-    registerClass(classOf[ExecutionStateChanged])
-    registerClass(classOf[JobStatusChanged])
-
-    //JobClient messages
-    registerClass(classOf[SubmitJobAndWait])
-    registerClass(classOf[SubmitJobDetached])
-
-    // JobManager messages
-    registerClass(classOf[SubmitJob])
-    registerClass(classOf[SubmissionSuccess])
-    registerClass(classOf[SubmissionFailure])
-    registerClass(classOf[CancelJob])
-    registerClass(classOf[UpdateTaskExecutionState])
-    registerClass(classOf[RequestNextInputSplit])
-    registerClass(classOf[LookupConnectionInformation])
-    registerClass(classOf[ConnectionInformation])
-    registerClass(classOf[ReportAccumulatorResult])
-    registerClass(classOf[RequestAccumulatorResults])
-    registerClass(classOf[AccumulatorResultsFound])
-    registerClass(classOf[AccumulatorResultsNotFound])
-    registerClass(classOf[RequestJobStatus])
-    registerClass(classOf[CurrentJobStatus])
-    registerClass(RequestNumberRegisteredTaskManager.getClass)
-    registerClass(RequestTotalNumberOfSlots.getClass)
-    registerClass(RequestBlobManagerPort.getClass)
-    registerClass(classOf[RequestFinalJobStatus])
-    registerClass(classOf[JobResultSuccess])
-    registerClass(classOf[JobResultCanceled])
-    registerClass(classOf[JobResultFailed])
-    registerClass(classOf[CancellationSuccess])
-    registerClass(classOf[CancellationFailure])
-    registerClass(RequestRunningJobs.getClass)
-    registerClass(classOf[RunningJobs])
-    registerClass(classOf[RequestJob])
-    registerClass(classOf[JobFound])
-    registerClass(classOf[JobNotFound])
-    registerClass(RequestRegisteredTaskManagers.getClass)
-    registerClass(classOf[RegisteredTaskManagers])
-
-    //JobManagerProfiler messages
-    registerClass(classOf[ReportProfilingData])
-
-    //Registration messages
-    registerClass(classOf[RegisterTaskManager])
-    registerClass(classOf[AcknowledgeRegistration])
-
-    //TaskManager messages
-    registerClass(classOf[CancelTask])
-    registerClass(classOf[SubmitTask])
-    registerClass(classOf[NextInputSplit])
-    registerClass(classOf[UnregisterTask])
-    registerClass(classOf[TaskOperationResult])
-    registerClass(NotifyWhenRegisteredAtJobManager.getClass)
-    registerClass(RegisterAtJobManager.getClass)
-    registerClass(RegisteredAtJobManager.getClass)
-    registerClass(SendHeartbeat.getClass)
-    registerClass(classOf[Heartbeat])
-    registerClass(LogMemoryUsage.getClass)
-
-    //TaskManagerProfiler messages
-    registerClass(classOf[MonitorTask])
-    registerClass(classOf[UnmonitorTask])
-    registerClass(RegisterProfilingListener.getClass)
-    registerClass(UnregisterProfilingListener.getClass)
-    registerClass(ProfileTasks.getClass)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala
deleted file mode 100644
index fc2ec8c..0000000
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import com.esotericsoftware.kryo.Kryo
-import org.apache.flink.runtime.akka.KryoInitializer
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
-
-class KryoTestingInitializer {
-  def customize(kryo: Kryo): Unit = {
-    val initializer = new KryoInitializer
-
-    initializer.customize(kryo)
-
-    kryo.register(classOf[RequestExecutionGraph])
-    kryo.register(classOf[ExecutionGraphFound])
-    kryo.register(classOf[ExecutionGraphNotFound])
-    kryo.register(classOf[WaitForAllVerticesToBeRunning])
-    kryo.register(classOf[AllVerticesRunning])
-    kryo.register(classOf[NotifyWhenJobRemoved])
-
-    kryo.register(classOf[NotifyWhenTaskRemoved])
-    kryo.register(RequestRunningTasks.getClass)
-    kryo.register(classOf[ResponseRunningTasks])
-    kryo.register(RequestBroadcastVariablesWithReferences.getClass)
-    kryo.register(classOf[ResponseBroadcastVariablesWithReferences])
-    kryo.register(classOf[CheckIfJobRemoved])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a166609..41dbd01 100644
--- a/pom.xml
+++ b/pom.xml
@@ -283,12 +283,6 @@ under the License.
                        </dependency>
 
                        <dependency>
-                               <groupId>com.github.romix.akka</groupId>
-                               
<artifactId>akka-kryo-serialization_2.10</artifactId>
-                               <version>${kryoserialization.version}</version>
-                       </dependency>
-
-                       <dependency>
                                <groupId>org.scalatest</groupId>
                                <artifactId>scalatest_2.10</artifactId>
                                <version>2.2.2</version>

Reply via email to