[ https://issues.apache.org/jira/browse/SPARK-1785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tathagata Das closed SPARK-1785. -------------------------------- Resolution: Fixed > Streaming requires receivers to be serializable > ----------------------------------------------- > > Key: SPARK-1785 > URL: https://issues.apache.org/jira/browse/SPARK-1785 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 0.9.0 > Reporter: Hari Shreedharan > > When the ReceiverTracker starts the receivers it creates a temporary RDD to > send the receivers over to the workers. Then they are started on the workers > using a the startReceivers method. > Looks like this means that the receivers have to really be serializable. In > case of the Flume receiver, the Avro IPC components are not serializable > causing an error that looks like this: > {code} > Exception in thread "Thread-46" org.apache.spark.SparkException: Job aborted > due to stage failure: Task not serializable: > java.io.NotSerializableException: > org.apache.avro.ipc.specific.SpecificResponder > - field (class "org.apache.spark.streaming.flume.FlumeReceiver", name: > "responder", type: "class org.apache.avro.ipc.specific.SpecificResponder") > - object (class "org.apache.spark.streaming.flume.FlumeReceiver", > org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36) > - element of array (index: 0) > - array (class "[Lorg.apache.spark.streaming.receiver.Receiver;", size: > 1) > - field (class "scala.collection.mutable.WrappedArray$ofRef", name: > "array", type: "class [Ljava.lang.Object;") > - object (class "scala.collection.mutable.WrappedArray$ofRef", > WrappedArray(org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36)) > - field (class "org.apache.spark.rdd.ParallelCollectionPartition", > name: "values", type: "interface scala.collection.Seq") > - custom writeObject data (class > "org.apache.spark.rdd.ParallelCollectionPartition") > - object (class "org.apache.spark.rdd.ParallelCollectionPartition", > org.apache.spark.rdd.ParallelCollectionPartition@691) > - writeExternal data > - root object (class "org.apache.spark.scheduler.ResultTask", > ResultTask(0, 0)) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > A way out of this is to simply send the class name (or .class) to the workers > in the tempRDD and have the workers instantiate and start the receiver. > My analysis maybe wrong. but if it makes sense, I will submit a PR to fix > this. -- This message was sent by Atlassian JIRA (v6.2#6252)