[ 
https://issues.apache.org/jira/browse/SPARK-1785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das closed SPARK-1785.
--------------------------------

    Resolution: Fixed

> Streaming requires receivers to be serializable
> -----------------------------------------------
>
>                 Key: SPARK-1785
>                 URL: https://issues.apache.org/jira/browse/SPARK-1785
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.9.0
>            Reporter: Hari Shreedharan
>
> When the ReceiverTracker starts the receivers it creates a temporary RDD to  
> send the receivers over to the workers. Then they are started on the workers  
> using a the startReceivers method.
> Looks like this means that the receivers have to really be serializable. In 
> case of the Flume receiver, the Avro IPC components are not serializable 
> causing an error that looks like this:
> {code}
> Exception in thread "Thread-46" org.apache.spark.SparkException: Job aborted 
> due to stage failure: Task not serializable: 
> java.io.NotSerializableException: 
> org.apache.avro.ipc.specific.SpecificResponder
>       - field (class "org.apache.spark.streaming.flume.FlumeReceiver", name: 
> "responder", type: "class org.apache.avro.ipc.specific.SpecificResponder")
>       - object (class "org.apache.spark.streaming.flume.FlumeReceiver", 
> org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36)
>       - element of array (index: 0)
>       - array (class "[Lorg.apache.spark.streaming.receiver.Receiver;", size: 
> 1)
>       - field (class "scala.collection.mutable.WrappedArray$ofRef", name: 
> "array", type: "class [Ljava.lang.Object;")
>       - object (class "scala.collection.mutable.WrappedArray$ofRef", 
> WrappedArray(org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36))
>       - field (class "org.apache.spark.rdd.ParallelCollectionPartition", 
> name: "values", type: "interface scala.collection.Seq")
>       - custom writeObject data (class 
> "org.apache.spark.rdd.ParallelCollectionPartition")
>       - object (class "org.apache.spark.rdd.ParallelCollectionPartition", 
> org.apache.spark.rdd.ParallelCollectionPartition@691)
>       - writeExternal data
>       - root object (class "org.apache.spark.scheduler.ResultTask", 
> ResultTask(0, 0))
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> A way out of this is to simply send the class name (or .class) to the workers 
> in the tempRDD and have the workers instantiate and start the receiver.
> My analysis maybe wrong. but if it makes sense, I will submit a PR to fix 
> this.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to