Alan/TD, We are facing the problem in a project going to production.
Was there any progress on this? Are we able to confirm that this is a bug/limitation in the current streaming code? Or there is anything wrong in user scope? Regards, Rohit *Founder & CEO, **Tuplejump, Inc.* ____________________________ www.tuplejump.com *The Data Engineering Platform* On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <a...@opsclarity.com> wrote: > The stack trace was from running the Actor count sample directly, without > a spark cluster, so I guess the logs would be from both? I enabled more > logging and got this stack trace > > 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan > 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(alan) > 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie > is: off > 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started > 14/07/25 17:55:27 [INFO] Remoting: Starting remoting > 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on > addresses :[akka.tcp://spark@leungshwingchun:52156] > 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [ > akka.tcp://spark@leungshwingchun:52156] > 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker > 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster > 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at > root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/' > 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at > /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2 > 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity > 297.0 MB. > 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157 > with id = ConnectionManagerId(leungshwingchun,52157) > 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register > BlockManager > 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager > leungshwingchun:52157 with 297.0 MB RAM > 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager > 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server > 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security > 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at > http://192.168.1.233:52158 > 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is > /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de > 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server > 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security > 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at: > http://192.168.1.233:52159 > 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at > http://leungshwingchun:4040 > 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field > org.apache.hadoop.metrics2.lib.MutableRate > org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess > with annotation > @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, > value=[Rate of successful kerberos logins and latency (milliseconds)], > always=false, type=DEFAULT, sampleName=Ops) > 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field > org.apache.hadoop.metrics2.lib.MutableRate > org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure > with annotation > @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, > value=[Rate of failed kerberos logins and latency (milliseconds)], > always=false, type=DEFAULT, sampleName=Ops) > 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group > related metrics > 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from > SCDynamicStore > 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not > found, setting default realm to empty > 14/07/25 17:55:27 [DEBUG] Groups: Creating new Groups object > 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the > custom-built native-hadoop library... > 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop > with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path > 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path= > 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling > back to shell based > 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group > mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping > 14/07/25 17:55:27 [DEBUG] Groups: Group mapping > impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; > cacheTimeout=300000 > 14/07/25 17:55:28 [INFO] SparkContext: Added JAR > file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar > at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar > with timestamp 1406336128212 > 14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler > 14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started > 14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1 > 14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1 > 14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1 > 14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1 > 14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1 > 14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms > 14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null > 14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000 > ms > 14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated > org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8 > 14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms > 14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null > 14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms > 14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7 > 14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms > 14/07/25 17:55:28 [INFO] MappedDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null > 14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms > 14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.MappedDStream@1bc31f72 > 14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms > 14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null > 14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms > 14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated > org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d > 14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms > 14/07/25 17:55:28 [INFO] ForEachDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null > 14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms > 14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated > org.apache.spark.streaming.dstream.ForEachDStream@15b35a10 > 14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers > 14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at > ReceiverTracker.scala:275 > 14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at > ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false) > 14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at > ReceiverTracker.scala:275) > 14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List() > 14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List() > 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0) > 14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List() > 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0 > (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which > has no missing parents > 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0) > 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator > at time 1406336130000 > 14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at > 1406336130000 ms > 14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler > 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: > org.apache.spark.examples.streaming.SerializationRegistry > 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from > Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253) > 14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks: > Set(ResultTask(0, 0)) > 14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1 > tasks > 14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0 > 14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for > TaskSet 0.0: ANY > 14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name: > TaskSet_0, runningTasks: 0 > 14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on > executor localhost: localhost (PROCESS_LOCAL) > 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: > org.apache.spark.examples.streaming.SerializationRegistry > 14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750 > bytes in 8 ms > 14/07/25 17:55:28 [INFO] Executor: Running task ID 0 > 14/07/25 17:55:28 [INFO] Executor: Fetching > http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with > timestamp 1406336128212 > 14/07/25 17:55:28 [INFO] Utils: Fetching > http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to > /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp > 14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security > 14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home > directory > java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set. > at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225) > at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250) > at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867) > at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) > at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421) > at > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330) > at > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at org.apache.spark.executor.Executor.org > $apache$spark$executor$Executor$$updateDependencies(Executor.scala:328) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:722) > 14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine. > So not using it. > 14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0 > 14/07/25 17:55:28 [INFO] Executor: Adding > file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar > to class loader > 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: > org.apache.spark.examples.streaming.SerializationRegistry > 14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0 > 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0 > 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator > at time 1406336129000 > 14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator > 14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread > 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver > 14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers > initialized at:akka://spark/user/Supervisor0 > 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart > creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79, > class akka.actor.ActorCell > 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 > from akka://spark > 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream > 0 from akka://spark > 14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while > creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher > [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] > akka.actor.ActorInitializationException: exception during creation > at akka.actor.ActorInitializationException$.apply(Actor.scala:218) > at akka.actor.ActorCell.create(ActorCell.scala:578) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) > at akka.dispatch.Mailbox.run(Mailbox.scala:218) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: akka.ConfigurationException: configuration problem while > creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher > [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] > at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723) > at > akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296) > at akka.actor.dungeon.Children$class.makeChild(Children.scala:191) > at akka.actor.dungeon.Children$class.actorOf(Children.scala:38) > at akka.actor.ActorCell.actorOf(ActorCell.scala:338) > at > org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152) > at > org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145) > at > org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145) > at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401) > at akka.actor.Props.newActor(Props.scala:339) > at akka.actor.ActorCell.newActor(ActorCell.scala:534) > at akka.actor.ActorCell.create(ActorCell.scala:560) > ... 9 more > Caused by: java.lang.IllegalArgumentException: constructor public > akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with > arguments [class java.lang.Class, class > org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2] > at akka.util.Reflect$.instantiate(Reflect.scala:69) > at akka.actor.Props.cachedActorClass(Props.scala:203) > at akka.actor.Props.actorClass(Props.scala:327) > at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124) > at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718) > ... 20 more > Caused by: java.lang.IllegalArgumentException: wrong number of arguments > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:525) > at akka.util.Reflect$.instantiate(Reflect.scala:65) > ... 24 more > 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator > called at time 1406336129000 > 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator > called at time 1406336129200 > 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator > called at time 1406336129400 > 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator > called at time 1406336129600 > 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator > called at time 1406336129800 > 14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator > called at time 1406336130000 > > On Jul 25, 2014, at 3:20 PM, Tathagata Das <tathagata.das1...@gmail.com> > wrote: > > Is this error on the executor or on the driver? Can you provide a larger > snippet of the logs, driver as well as if possible executor logs. > > TD > > > On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <a...@opsclarity.com> wrote: > >> bump. any ideas? >> >> On Jul 24, 2014, at 3:09 AM, Alan Ngai <a...@opsclarity.com> wrote: >> >> it looks like when you configure sparkconfig to use the kryoserializer in >> combination of using an ActorReceiver, bad things happen. I modified the >> ActorWordCount example program from >> >> val sparkConf = new SparkConf().setAppName("ActorWordCount") >> >> to >> >> val sparkConf = new SparkConf() >> .setAppName("ActorWordCount") >> .set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer”) >> >> and I get the stack trace below. I figured it might be that Kryo doesn’t >> know how to serialize/deserialize the actor so I added a registry. I also >> added a default empty constructor to SampleActorReceiver just for kicks >> >> class SerializationRegistry extends KryoRegistrator { >> override def registerClasses(kryo: Kryo) { >> kryo.register(classOf[SampleActorReceiver]) >> } >> } >> >> … >> >> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String) >> extends Actor with ActorHelper { >> def this() = this(“”) >> ... >> } >> >> ... >> val sparkConf = new SparkConf() >> .setAppName("ActorWordCount") >> .set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") >> .set("spark.kryo.registrator", >> "org.apache.spark.examples.streaming.SerializationRegistry") >> >> >> None of this worked, same stack trace. Any idea what’s going on? Is >> this a known issue and is there a workaround? >> >> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while >> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher >> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] >> akka.actor.ActorInitializationException: exception during creation >> at akka.actor.ActorInitializationException$.apply(Actor.scala:218) >> at akka.actor.ActorCell.create(ActorCell.scala:578) >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) >> at akka.dispatch.Mailbox.run(Mailbox.scala:218) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: akka.ConfigurationException: configuration problem while >> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher >> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] >> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723) >> at >> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296) >> at akka.actor.dungeon.Children$class.makeChild(Children.scala:191) >> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38) >> at akka.actor.ActorCell.actorOf(ActorCell.scala:338) >> at >> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152) >> at >> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145) >> at >> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145) >> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401) >> at akka.actor.Props.newActor(Props.scala:339) >> at akka.actor.ActorCell.newActor(ActorCell.scala:534) >> at akka.actor.ActorCell.create(ActorCell.scala:560) >> ... 9 more >> Caused by: java.lang.IllegalArgumentException: constructor public >> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with >> arguments [class java.lang.Class, class >> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2] >> at akka.util.Reflect$.instantiate(Reflect.scala:69) >> at akka.actor.Props.cachedActorClass(Props.scala:203) >> at akka.actor.Props.actorClass(Props.scala:327) >> at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124) >> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718) >> ... 20 more >> Caused by: java.lang.IllegalArgumentException: wrong number of arguments >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:525) >> at akka.util.Reflect$.instantiate(Reflect.scala:65) >> ... 24 more >> >> >> > >