it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen. I modified the ActorWordCount example program from
val sparkConf = new SparkConf().setAppName("ActorWordCount") to val sparkConf = new SparkConf() .setAppName("ActorWordCount") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”) and I get the stack trace below. I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry. I also added a default empty constructor to SampleActorReceiver just for kicks class SerializationRegistry extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[SampleActorReceiver]) } } … case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String) extends Actor with ActorHelper { def this() = this(“”) ... } ... val sparkConf = new SparkConf() .setAppName("ActorWordCount") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry") None of this worked, same stack trace. Any idea what’s going on? Is this a known issue and is there a workaround? 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:218) at akka.actor.ActorCell.create(ActorCell.scala:578) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.ConfigurationException: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723) at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296) at akka.actor.dungeon.Children$class.makeChild(Children.scala:191) at akka.actor.dungeon.Children$class.actorOf(Children.scala:38) at akka.actor.ActorCell.actorOf(ActorCell.scala:338) at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152) at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145) at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145) at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401) at akka.actor.Props.newActor(Props.scala:339) at akka.actor.ActorCell.newActor(ActorCell.scala:534) at akka.actor.ActorCell.create(ActorCell.scala:560) ... 9 more Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2] at akka.util.Reflect$.instantiate(Reflect.scala:69) at akka.actor.Props.cachedActorClass(Props.scala:203) at akka.actor.Props.actorClass(Props.scala:327) at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124) at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718) ... 20 more Caused by: java.lang.IllegalArgumentException: wrong number of arguments at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:525) at akka.util.Reflect$.instantiate(Reflect.scala:65) ... 24 more