Remove experimental KryoSerialization for Akka
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ae8fb948 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ae8fb948 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ae8fb948 Branch: refs/heads/master Commit: ae8fb948281c5b9ba6af8145bad21395c22a3e1a Parents: b962243 Author: Till Rohrmann <[email protected]> Authored: Wed Dec 17 18:59:52 2014 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Dec 18 18:58:33 2014 +0100 ---------------------------------------------------------------------- .../mapreduce/HadoopOutputFormat.java | 47 ++-- flink-runtime/pom.xml | 5 - .../apache/flink/runtime/akka/AkkaUtils.scala | 177 --------------- .../flink/runtime/akka/KryoInitializer.scala | 221 ------------------- .../testingUtils/KryoTestingInitializer.scala | 46 ---- pom.xml | 6 - 6 files changed, 22 insertions(+), 480 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java index cce7695..326f8ef 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java @@ -138,19 +138,18 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement } File dir = new File(this.configuration.get("mapred.output.dir")); - if(dir.exists()){ - if(dir.isDirectory()){ - File[] files = dir.listFiles(); - System.out.println(configuration.get("mapred.output.dir") + " contains the " + - "following files."); - for(File file: files){ - System.out.println(file.toPath()); - } - }else{ - System.out.println(configuration.get("mapred.output.dir") + " is not a directory."); + + if(dir.isDirectory()){ + File[] files = dir.listFiles(); + System.out.println(configuration.get("mapred.output.dir") + " contains the " + + "following files."); + for(File file: files){ + System.out.println(file.toURI()); } + }else if(dir.exists()){ + System.out.println(configuration.get("mapred.output.dir") + " is not a directory."); }else{ - System.out.println(configuration.get("mapred.output.dir") + " does not exist yet."); + System.out.println(configuration.get("mapred.output.dir") + " does not yet exists."); } } @@ -184,23 +183,20 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement Path outputPath = new Path(this.configuration.get("mapred.output.dir")); File dir = new File(this.configuration.get("mapred.output.dir")); - if(dir.exists()){ - if(dir.isDirectory()){ - File[] files = dir.listFiles(); - System.out.println("Close: " +configuration.get("mapred.output.dir") + " contains" + - " the " + - "following files."); - for(File file: files){ - System.out.println(file.toPath()); - } - }else{ - System.out.println("Close: " +configuration.get("mapred.output.dir") + " is not a" + - " directory."); + + if(dir.isDirectory()){ + File[] files = dir.listFiles(); + System.out.println(configuration.get("mapred.output.dir") + " contains the " + + "following files."); + for(File file: files){ + System.out.println(file.toURI()); } + }else if(dir.exists()){ + System.out.println(configuration.get("mapred.output.dir") + " is not a directory."); }else{ - System.out.println("Close: " +configuration.get("mapred.output.dir") + " does not " + - "exist yet))."); + System.out.println(configuration.get("mapred.output.dir") + " does not yet exists."); } + // rename tmp-file to final name FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration); @@ -221,6 +217,7 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement @Override public void finalizeGlobal(int parallelism) throws IOException { + System.out.println("Finalize HadoopOutputFormat."); JobContext jobContext; TaskAttemptContext taskContext; try { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index b7edf7a..dee36f8 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -123,11 +123,6 @@ under the License. </dependency> <dependency> - <groupId>com.github.romix.akka</groupId> - <artifactId>akka-kryo-serialization_2.10</artifactId> - </dependency> - - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.10</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 5f6d59a..8e12443 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -214,183 +214,6 @@ object AkkaUtils { """.stripMargin } - // scalastyle:off line.size.limit - - def getKryoSerializerString: String = { - """ - |akka { - | - | extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"] - | - | actor{ - | kryo{ - | type = "graph" - | idstrategy = "incremental" - | serializer-pool-size = 16 - | buffer-size = 4096 - | max-buffer-size = -1 - | use-manifests = false - | compression = off - | implicit-registration-logging = true - | kryo-trace = false - | kryo-custom-serializer-init = "org.apache.flink.runtime.akka.KryoInitializer" - | } - | - | - | serializers{ - | kryo = "com.romix.akka.serialization.kryo.KryoSerializer" - | java = "akka.serialization.JavaSerializer" - | } - | - | serialization-bindings { - | "java.io.Serializable" = java - | - | "java.lang.Throwable" = java - | "akka.event.Logging$Error" = java - | "java.lang.Integer" = kryo - | "java.lang.Long" = kryo - | "java.lang.Float" = kryo - | "java.lang.Double" = kryo - | "java.lang.Boolean" = kryo - | "java.lang.Short" = kryo - | - | "scala.Tuple2" = kryo - | "scala.Tuple3" = kryo - | "scala.Tuple4" = kryo - | "scala.Tuple5" = kryo - | "scala.Tuple6" = kryo - | "scala.Tuple7" = kryo - | "scala.Tuple8" = kryo - | "scala.Tuple9" = kryo - | "scala.Tuple10" = kryo - | "scala.Tuple11" = kryo - | "scala.Tuple12" = kryo - | "scala.collection.BitSet" = kryo - | "scala.collection.SortedSet" = kryo - | "scala.util.Left" = kryo - | "scala.util.Right" = kryo - | "scala.collection.SortedMap" = kryo - | "scala.Int" = kryo - | "scala.Long" = kryo - | "scala.Float" = kryo - | "scala.Double" = kryo - | "scala.Boolean" = kryo - | "scala.Short" = kryo - | "java.lang.String" = kryo - | "scala.Option" = kryo - | "scala.collection.immutable.Map" = kryo - | "scala.collection.Traversable" = kryo - | "scala.runtime.BoxedUnit" = kryo - | - | "akka.actor.SystemGuardian$RegisterTerminationHook$" = kryo - | "akka.actor.Address" = kryo - | "akka.actor.Terminated" = kryo - | "akka.actor.LocalActorRef" = kryo - | "akka.actor.RepointableActorRef" = kryo - | "akka.actor.Identify" = kryo - | "akka.actor.ActorIdentity" = kryo - | "akka.actor.PoisonPill$" = kryo - | "akka.actor.SystemGuardian$TerminationHook$" = kryo - | "akka.actor.SystemGuardian$TerminationHookDone$" = kryo - | "akka.actor.AddressTerminated" = kryo - | "akka.actor.Status$Failure" = kryo - | "akka.remote.RemoteWatcher$ReapUnreachableTick$" = kryo - | "akka.remote.RemoteWatcher$HeartbeatTick$" = kryo - | "akka.remote.ReliableDeliverySupervisor$GotUid" = kryo - | "akka.remote.EndpointWriter$AckIdleCheckTimer$" = kryo - | "akka.remote.EndpointWriter$StoppedReading" = kryo - | "akka.remote.ReliableDeliverySupervisor$Ungate$" = kryo - | "akka.remote.EndpointWriter$StopReading" = kryo - | "akka.remote.EndpointWriter$OutboundAck" = kryo - | "akka.remote.Ack" = kryo - | "akka.remote.SeqNo" = kryo - | "akka.remote.EndpointWriter$FlushAndStop$" = kryo - | "akka.remote.ReliableDeliverySupervisor$AttemptSysMsgRedelivery$" = kryo - | "akka.remote.RemoteWatcher$WatchRemote" = kryo - | "akka.remote.RemoteWatcher$UnwatchRemote" = kryo - | "akka.remote.RemoteWatcher$RewatchRemote" = kryo - | "akka.remote.RemoteWatcher$Rewatch" = kryo - | "akka.remote.RemoteWatcher$Heartbeat$" = kryo - | "akka.remote.RemoteWatcher$HeartbeatRsp" = kryo - | "akka.remote.EndpointWriter$FlushAndStopTimeout$" = kryo - | "akka.remote.RemoteWatcher$ExpectedFirstHeartbeat" = kryo - | "akka.remote.transport.Transport$InvalidAssociationException" = kryo - | "akka.dispatch.sysmsg.Terminate" = kryo - | "akka.dispatch.sysmsg.Unwatch" = kryo - | "akka.dispatch.sysmsg.Watch" = kryo - | "akka.dispatch.sysmsg.DeathWatchNotification" = kryo - | - | "org.apache.flink.runtime.messages.ArchiveMessages$ArchiveExecutionGraph" = kryo - | "org.apache.flink.runtime.messages.ArchiveMessages$ArchivedJobs" = kryo - | - | "org.apache.flink.runtime.messages.ExecutionGraphMessages$ExecutionStateChanged" = kryo - | "org.apache.flink.runtime.messages.ExecutionGraphMessages$JobStatusChanged" = kryo - | - | "org.apache.flink.runtime.messages.JobClientMessages$SubmitJobAndWait" = kryo - | "org.apache.flink.runtime.messages.JobClientMessages$SubmitJobDetached" = kryo - | - | "org.apache.flink.runtime.messages.JobManagerMessages$SubmitJob" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$SubmissionSuccess" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$SubmissionFailure" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$CancellationSuccess" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$CancellationFailure" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$CancelJob" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$UpdateTaskExecutionState" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RequestNextInputSplit" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$LookupConnectionInformation" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$ConnectionInformation" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$ReportAccumulatorResult" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RequestAccumulatorResults" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$AccumulatorResultsFound" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$AccumulatorResultsNotFound" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RequestJobStatus" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$CurrentJobStatus" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RequestNumberRegisteredTaskManager$" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RequestTotalNumberOfSlots$" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RequestBlobManagerPort$" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RequestFinalJobStatus" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$JobResultSuccess" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$JobResultCanceled" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailed" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobs$" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RunningJobs" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RequestJob" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$JobFound" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$JobNotFound" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RequestRegisteredTaskManagers$" = kryo - | "org.apache.flink.runtime.messages.JobManagerMessages$RegisteredTaskManagers" = kryo - | - | "org.apache.flink.runtime.messages.JobManagerProfilerMessages$ReportProfilingData" = kryo - | - | "org.apache.flink.runtime.messages.TaskManagerMessages$NotifyWhenRegisteredAtJobManager$" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$RegisterAtJobManager$" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$CancelTask" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$SubmitTask" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$NextInputSplit" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$UnregisterTask" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$TaskOperationResult" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$Heartbeat" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$RegisteredAtJobManager$" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$RegisterAtJobManager$" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$SendHeartbeat$" = kryo - | "org.apache.flink.runtime.messages.TaskManagerMessages$LogMemoryUsage$" = kryo - | - | "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$MonitorTask" = kryo - | "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$UnmonitorTask" = kryo - | "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$RegisterProfilingListener$" = kryo - | "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$UnregisterProfilingListener$" = kryo - | "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$ProfileTasks$" = kryo - | - | "org.apache.flink.runtime.messages.RegistrationMessages$RegisterTaskManager" = kryo - | "org.apache.flink.runtime.messages.RegistrationMessages$AcknowledgeRegistration" = kryo - | } - | } - |} - """.stripMargin - } - - // scalastyle:on line.size.limit - def getDefaultActorSystemConfig = { ConfigFactory.parseString(getDefaultActorSystemConfigString) } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala deleted file mode 100644 index 2a492fd..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.akka - -import java.net.Inet4Address - -import com.esotericsoftware.kryo.serializers.JavaSerializer -import com.esotericsoftware.kryo.{Serializer, Kryo} -import org.apache.flink.api.common.accumulators.Accumulator -import org.apache.flink.core.fs.FileInputSplit -import org.apache.flink.core.io.{LocatableInputSplit, GenericInputSplit} -import org.apache.flink.runtime.accumulators.AccumulatorEvent -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor -import org.apache.flink.runtime.execution.ExecutionState -import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph} -import org.apache.flink.runtime.instance.{Instance, InstanceID, HardwareDescription, -InstanceConnectionInfo} -import org.apache.flink.runtime.io.network.{RemoteReceiver, ConnectionInfoLookupResponse} -import org.apache.flink.runtime.io.network.channels.ChannelID -import org.apache.flink.runtime.jobgraph.{JobVertexID, JobStatus, JobID, JobGraph} -import org.apache.flink.runtime.messages.ArchiveMessages.{ArchivedJobs, RequestArchivedJobs, -ArchiveExecutionGraph} -import org.apache.flink.runtime.messages.ExecutionGraphMessages.{ExecutionStateChanged, -JobStatusChanged} -import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, SubmitJobAndWait} -import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData -import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, -RegisterTaskManager} -import org.apache.flink.runtime.messages.TaskManagerMessages._ -import org.apache.flink.runtime.messages.TaskManagerProfilerMessages._ -import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer -import org.apache.flink.runtime.taskmanager.{Task, TaskExecutionState} - -class KryoInitializer { - def customize(kryo: Kryo): Unit = { - - register(kryo) - } - - def register(kryo: Kryo): Unit = { - def register(className: String): Unit = { - kryo.register(Class.forName(className)) - } - - def registerClass(clazz: Class[_], serializer: Serializer[_] = null): Unit = { - if(serializer != null){ - kryo.register(clazz, serializer) - }else { - kryo.register(clazz) - } - } - - register("scala.Some") - register("scala.None$") - register("scala.collection.immutable.Set$EmptySet$") - register("scala.runtime.BoxedUnit") - - register("akka.actor.SystemGuardian$RegisterTerminationHook$") - register("akka.actor.Address") - register("akka.actor.Terminated") - register("akka.actor.LocalActorRef") - register("akka.actor.RepointableActorRef") - register("akka.actor.Identify") - register("akka.actor.ActorIdentity") - register("akka.actor.PoisonPill$") - register("akka.actor.AddressTerminated") - register("akka.actor.Status$Failure") - register("akka.remote.RemoteWatcher$ReapUnreachableTick$") - register("akka.remote.RemoteWatcher$HeartbeatTick$") - register("akka.remote.ReliableDeliverySupervisor$GotUid") - register("akka.remote.EndpointWriter$AckIdleCheckTimer$") - register("akka.remote.EndpointWriter$StoppedReading") - register("akka.remote.ReliableDeliverySupervisor$Ungate$") - register("akka.remote.EndpointWriter$StopReading") - register("akka.remote.EndpointWriter$OutboundAck") - register("akka.remote.Ack") - register("akka.remote.SeqNo") - register("akka.remote.RemoteWatcher$HeartbeatRsp") - register("akka.actor.SystemGuardian$TerminationHook$") - register("akka.actor.SystemGuardian$TerminationHookDone$") - register("akka.remote.EndpointWriter$FlushAndStop$") - register("akka.remote.RemoteWatcher$WatchRemote") - register("akka.remote.RemoteWatcher$UnwatchRemote") - register("akka.remote.RemoteWatcher$Rewatch") - register("akka.remote.RemoteWatcher$RewatchRemote") - register("akka.remote.ReliableDeliverySupervisor$AttemptSysMsgRedelivery$") - register("akka.remote.RemoteActorRef") - register("akka.remote.RemoteWatcher$Heartbeat$") - register("akka.remote.EndpointWriter$FlushAndStopTimeout$") - register("akka.remote.RemoteWatcher$ExpectedFirstHeartbeat") - register("akka.remote.transport.Transport$InvalidAssociationException") - register("akka.remote.transport.AkkaProtocolException") - register("akka.dispatch.sysmsg.Terminate") - register("akka.dispatch.sysmsg.Unwatch") - register("akka.dispatch.sysmsg.Watch") - register("akka.dispatch.sysmsg.DeathWatchNotification") - -// register("java.util.Collections$UnmodifiableRandomAccessList") - - - //Register Flink messages - - kryo.setDefaultSerializer(classOf[JavaSerializer]) - - //misc types - registerClass(classOf[JobID]) - registerClass(classOf[JobVertexID]) - registerClass(classOf[ExecutionAttemptID]) - registerClass(classOf[InstanceID]) - registerClass(classOf[ExecutionState]) - registerClass(classOf[JobStatus]) - registerClass(classOf[TaskExecutionState]) - registerClass(classOf[InstanceConnectionInfo]) - registerClass(classOf[HardwareDescription]) - registerClass(classOf[Inet4Address]) - registerClass(classOf[ChannelID]) - registerClass(classOf[ConnectionInfoLookupResponse]) - registerClass(classOf[RemoteReceiver]) - registerClass(classOf[AccumulatorEvent], new JavaSerializer) - registerClass(classOf[Instance], new JavaSerializer()) - registerClass(classOf[JobGraph], new JavaSerializer()) - registerClass(classOf[TaskDeploymentDescriptor], new JavaSerializer()) - registerClass(classOf[ExecutionGraph], new JavaSerializer()) - registerClass(classOf[ProfilingDataContainer], new JavaSerializer) - registerClass(classOf[Task], new JavaSerializer) - registerClass(classOf[GenericInputSplit], new JavaSerializer) - registerClass(classOf[LocatableInputSplit], new JavaSerializer) - registerClass(classOf[FileInputSplit], new JavaSerializer) - registerClass(classOf[StackTraceElement]) - registerClass(classOf[Array[StackTraceElement]]) - - //Archive messages - registerClass(classOf[ArchiveExecutionGraph]) - registerClass(RequestArchivedJobs.getClass) - registerClass(classOf[ArchivedJobs]) - - //ExecutionGraph messages - registerClass(classOf[ExecutionStateChanged]) - registerClass(classOf[JobStatusChanged]) - - //JobClient messages - registerClass(classOf[SubmitJobAndWait]) - registerClass(classOf[SubmitJobDetached]) - - // JobManager messages - registerClass(classOf[SubmitJob]) - registerClass(classOf[SubmissionSuccess]) - registerClass(classOf[SubmissionFailure]) - registerClass(classOf[CancelJob]) - registerClass(classOf[UpdateTaskExecutionState]) - registerClass(classOf[RequestNextInputSplit]) - registerClass(classOf[LookupConnectionInformation]) - registerClass(classOf[ConnectionInformation]) - registerClass(classOf[ReportAccumulatorResult]) - registerClass(classOf[RequestAccumulatorResults]) - registerClass(classOf[AccumulatorResultsFound]) - registerClass(classOf[AccumulatorResultsNotFound]) - registerClass(classOf[RequestJobStatus]) - registerClass(classOf[CurrentJobStatus]) - registerClass(RequestNumberRegisteredTaskManager.getClass) - registerClass(RequestTotalNumberOfSlots.getClass) - registerClass(RequestBlobManagerPort.getClass) - registerClass(classOf[RequestFinalJobStatus]) - registerClass(classOf[JobResultSuccess]) - registerClass(classOf[JobResultCanceled]) - registerClass(classOf[JobResultFailed]) - registerClass(classOf[CancellationSuccess]) - registerClass(classOf[CancellationFailure]) - registerClass(RequestRunningJobs.getClass) - registerClass(classOf[RunningJobs]) - registerClass(classOf[RequestJob]) - registerClass(classOf[JobFound]) - registerClass(classOf[JobNotFound]) - registerClass(RequestRegisteredTaskManagers.getClass) - registerClass(classOf[RegisteredTaskManagers]) - - //JobManagerProfiler messages - registerClass(classOf[ReportProfilingData]) - - //Registration messages - registerClass(classOf[RegisterTaskManager]) - registerClass(classOf[AcknowledgeRegistration]) - - //TaskManager messages - registerClass(classOf[CancelTask]) - registerClass(classOf[SubmitTask]) - registerClass(classOf[NextInputSplit]) - registerClass(classOf[UnregisterTask]) - registerClass(classOf[TaskOperationResult]) - registerClass(NotifyWhenRegisteredAtJobManager.getClass) - registerClass(RegisterAtJobManager.getClass) - registerClass(RegisteredAtJobManager.getClass) - registerClass(SendHeartbeat.getClass) - registerClass(classOf[Heartbeat]) - registerClass(LogMemoryUsage.getClass) - - //TaskManagerProfiler messages - registerClass(classOf[MonitorTask]) - registerClass(classOf[UnmonitorTask]) - registerClass(RegisterProfilingListener.getClass) - registerClass(UnregisterProfilingListener.getClass) - registerClass(ProfileTasks.getClass) - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala deleted file mode 100644 index fc2ec8c..0000000 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testingUtils - -import com.esotericsoftware.kryo.Kryo -import org.apache.flink.runtime.akka.KryoInitializer -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ - -class KryoTestingInitializer { - def customize(kryo: Kryo): Unit = { - val initializer = new KryoInitializer - - initializer.customize(kryo) - - kryo.register(classOf[RequestExecutionGraph]) - kryo.register(classOf[ExecutionGraphFound]) - kryo.register(classOf[ExecutionGraphNotFound]) - kryo.register(classOf[WaitForAllVerticesToBeRunning]) - kryo.register(classOf[AllVerticesRunning]) - kryo.register(classOf[NotifyWhenJobRemoved]) - - kryo.register(classOf[NotifyWhenTaskRemoved]) - kryo.register(RequestRunningTasks.getClass) - kryo.register(classOf[ResponseRunningTasks]) - kryo.register(RequestBroadcastVariablesWithReferences.getClass) - kryo.register(classOf[ResponseBroadcastVariablesWithReferences]) - kryo.register(classOf[CheckIfJobRemoved]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae8fb948/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a166609..41dbd01 100644 --- a/pom.xml +++ b/pom.xml @@ -283,12 +283,6 @@ under the License. </dependency> <dependency> - <groupId>com.github.romix.akka</groupId> - <artifactId>akka-kryo-serialization_2.10</artifactId> - <version>${kryoserialization.version}</version> - </dependency> - - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.10</artifactId> <version>2.2.2</version>
