
We are facing the problem in a project going to production.

Was there any progress on this? Are we able to confirm that this is a
bug/limitation in the current streaming code? Or there is anything wrong in
user scope?


*Founder & CEO, **Tuplejump, Inc.*
*The Data Engineering Platform*

On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <> wrote:

> The stack trace was from running the Actor count sample directly, without
> a spark cluster, so I guess the logs would be from both?  I enabled more
> logging and got this stack trace
> 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
>  14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(alan)
>  14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
> is: off
>  14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
>  14/07/25 17:55:27 [INFO] Remoting: Starting remoting
>  14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
> addresses :[akka.tcp://spark@leungshwingchun:52156]
>  14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
> akka.tcp://spark@leungshwingchun:52156]
>  14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
>  14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
>  14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at
> root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
>  14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
>  14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
> 297.0 MB.
>  14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
> with id = ConnectionManagerId(leungshwingchun,52157)
>  14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
> BlockManager
>  14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
> leungshwingchun:52157 with 297.0 MB RAM
>  14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
>  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>  14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
>  14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
>  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>  14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
>  14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
> http://leungshwingchun:4040
>  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
> org.apache.hadoop.metrics2.lib.MutableRate
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
> value=[Rate of successful kerberos logins and latency (milliseconds)],
> always=false, type=DEFAULT, sampleName=Ops)
>  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
> org.apache.hadoop.metrics2.lib.MutableRate
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
> value=[Rate of failed kerberos logins and latency (milliseconds)],
> always=false, type=DEFAULT, sampleName=Ops)
>  14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
> related metrics
>  2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
> SCDynamicStore
> 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
> found, setting default realm to empty
>  14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
> custom-built native-hadoop library...
>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
> with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
>  14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
> back to shell based
>  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group
> mapping
>  14/07/25 17:55:27 [DEBUG] Groups: Group mapping
> cacheTimeout=300000
>  14/07/25 17:55:28 [INFO] SparkContext: Added JAR
> file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar
> at
> with timestamp 1406336128212
>  14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
>  14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
>  14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000
> ms
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated
> org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
>  14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] MappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
>  14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] ForEachDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
>  14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
>  14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at
> ReceiverTracker.scala:275
>  14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at
> ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
>  14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at
> ReceiverTracker.scala:275)
>  14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
>  14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
>  14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0
> (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which
> has no missing parents
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
>  14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator
> at time 1406336130000
>  14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at
> 1406336130000 ms
>  14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
>  14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from
> Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks:
> Set(ResultTask(0, 0))
>  14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1
> tasks
>  14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
>  14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for
> TaskSet 0.0: ANY
>  14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name:
> TaskSet_0, runningTasks: 0
>  14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor localhost: localhost (PROCESS_LOCAL)
>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
>  14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750
> bytes in 8 ms
>  14/07/25 17:55:28 [INFO] Executor: Running task ID 0
>  14/07/25 17:55:28 [INFO] Executor: Fetching
> with
> timestamp 1406336128212
>  14/07/25 17:55:28 [INFO] Utils: Fetching
> to
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
>  14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
>  14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home
> directory
> HADOOP_HOME or hadoop.home.dir are not set.
>  at org.apache.hadoop.util.Shell.checkHadoopHome(
>  at org.apache.hadoop.util.Shell.<clinit>(
> at org.apache.hadoop.fs.FileUtil.chmod(
>  at org.apache.hadoop.fs.FileUtil.chmod(
>  at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
> at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
>  at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
>  at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>  at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>  at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>  at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>  at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>  at
> $apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
>  at org.apache.spark.executor.Executor$
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
>  at
> java.util.concurrent.ThreadPoolExecutor$
>  at
> 14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine.
> So not using it.
>  14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
>  14/07/25 17:55:28 [INFO] Executor: Adding
> file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar
> to class loader
>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
>  14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
>  14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator
> at time 1406336129000
>  14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
>  14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
>  14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers
> initialized at:akka://spark/user/Supervisor0
>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
>  creating actor of worker now!!!!!!!!!!!!!!!,
> class
> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0
> from akka://spark
>  14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
> 0 from akka://spark
>  14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [] and mailbox []
> exception during creation
>  at$.apply(Actor.scala:218)
> at
>  at$1(ActorCell.scala:425)
>  at
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>  at
>  at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>  at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>  at
> Caused by: akka.ConfigurationException: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [] and mailbox []
>  at
>  at
> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>  at$class.makeChild(Children.scala:191)
> at$class.actorOf(Children.scala:38)
>  at
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>  at
> at
>  at
>  at
> ... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public
> is incompatible with
> arguments [class java.lang.Class, class
> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>  at
> at
>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>  at
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(
>  at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>  at java.lang.reflect.Constructor.newInstance(
>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
> ... 24 more
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129000
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129200
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129400
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129600
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129800
>  14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336130000
> On Jul 25, 2014, at 3:20 PM, Tathagata Das <>
> wrote:
> Is this error on the executor or on the driver? Can you provide a larger
> snippet of the logs, driver as well as if possible executor logs.
> TD
> On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <> wrote:
>> bump.  any ideas?
>> On Jul 24, 2014, at 3:09 AM, Alan Ngai <> wrote:
>> it looks like when you configure sparkconfig to use the kryoserializer in
>> combination of using an ActorReceiver, bad things happen.  I modified the
>> ActorWordCount example program from
>>     val sparkConf = new SparkConf().setAppName("ActorWordCount")
>> to
>>     val sparkConf = new SparkConf()
>>       .setAppName("ActorWordCount")
>>       .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer”)
>> and I get the stack trace below.  I figured it might be that Kryo doesn’t
>> know how to serialize/deserialize the actor so I added a registry.  I also
>> added a default empty constructor to SampleActorReceiver just for kicks
>> class SerializationRegistry extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo) {
>>     kryo.register(classOf[SampleActorReceiver])
>>   }
>> }
>> …
>> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
>> extends Actor with ActorHelper {
>>   def this() = this(“”)
>>   ...
>> }
>> ...
>>     val sparkConf = new SparkConf()
>>       .setAppName("ActorWordCount")
>>       .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>       .set("spark.kryo.registrator",
>> "org.apache.spark.examples.streaming.SerializationRegistry")
>> None of this worked, same stack trace.  Any idea what’s going on?  Is
>> this a known issue and is there a workaround?
>> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [] and mailbox []
>> exception during creation
>>  at$.apply(Actor.scala:218)
>> at
>>  at$1(ActorCell.scala:425)
>>  at
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>  at
>>  at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>  at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>  at
>> Caused by: akka.ConfigurationException: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [] and mailbox []
>>  at
>>  at
>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>>  at$class.makeChild(Children.scala:191)
>> at$class.actorOf(Children.scala:38)
>>  at
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>  at
>> at
>>  at
>>  at
>> ... 9 more
>> Caused by: java.lang.IllegalArgumentException: constructor public
>> is incompatible with
>> arguments [class java.lang.Class, class
>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>>  at
>> at
>>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>>  at
>> ... 20 more
>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(
>>  at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>>  at java.lang.reflect.Constructor.newInstance(
>>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
>> ... 24 more

Reply via email to