Hi, I have some quick/dirty code here running in Spark 1.0.0 (CDH 5.1, Spark in Yarn cluster mode)
import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.kafka._ import kafka.producer._ import java.util.Properties def toKafka(str: String) { val props = new Properties() props.put("metadata.broker.list", "host1:9092,host2:902") props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("producer.type", "async") props.put("request.required.acks", "1") val topic = "normstruct" val config = new ProducerConfig(props) val producer = new Producer[String, String](config) val kOutMsg = new KeyedMessage[String,String](topic,str) producer.send(kOutMsg) } val ssc = new StreamingContext(sc,Seconds(5)) val kInMsg = KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","normApp",Map("rawunstruct" -> 1)) kInMsg.foreach(rdd => { rdd.foreach(e => toKafka(e._2)) }) ssc.start() Throws: 14/08/12 00:47:25 INFO DAGScheduler: Failed to run foreach at <console>:31 14/08/12 00:47:25 ERROR JobScheduler: Error running job streaming job 1407804445000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/08/12 00:47:25 INFO ShuffleBlockManager: Could not find files for shuffle 0 for deleting 14/08/12 00:47:25 INFO ContextCleaner: Cleaned shuffle 0 Thanks, Xuri --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org