it looks like when you configure sparkconfig to use the kryoserializer in 
combination of using an ActorReceiver, bad things happen.  I modified the 
ActorWordCount example program from 

    val sparkConf = new SparkConf().setAppName("ActorWordCount")

to

    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know 
how to serialize/deserialize the actor so I added a registry.  I also added a 
default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[SampleActorReceiver])
  }
}

…

case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", 
"org.apache.spark.examples.streaming.SerializationRegistry")


None of this worked, same stack trace.  Any idea what’s going on?  Is this a 
known issue and is there a workaround?  

14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while 
creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher 
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
 akka.actor.ActorInitializationException: exception during creation
        at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
        at akka.actor.ActorCell.create(ActorCell.scala:578)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
        at akka.dispatch.Mailbox.run(Mailbox.scala:218)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating 
[akka://spark/user/Supervisor0/SampleReceiver] with dispatcher 
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
        at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
        at 
akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
        at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
        at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
        at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
        at 
org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
        at 
org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
        at 
org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
        at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
        at akka.actor.Props.newActor(Props.scala:339)
        at akka.actor.ActorCell.newActor(ActorCell.scala:534)
        at akka.actor.ActorCell.create(ActorCell.scala:560)
        ... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public 
akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with 
arguments [class java.lang.Class, class 
org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
        at akka.util.Reflect$.instantiate(Reflect.scala:69)
        at akka.actor.Props.cachedActorClass(Props.scala:203)
        at akka.actor.Props.actorClass(Props.scala:327)
        at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
        at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
        ... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
        at akka.util.Reflect$.instantiate(Reflect.scala:65)
        ... 24 more

Reply via email to