Re: spark streaming - saving kafka DStream into hadoop throws exception

2014-10-31 Thread Sean Owen
Hm, now I am also seeing this problem.

The essence of my code is:

final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

JavaStreamingContextFactory streamingContextFactory = new
JavaStreamingContextFactory() {
  @Override
  public JavaStreamingContext create() {
return new JavaStreamingContext(sparkContext, new
Duration(batchDurationMS));
  }
};

  streamingContext = JavaStreamingContext.getOrCreate(
  checkpointDirString, sparkContext.hadoopConfiguration(),
streamingContextFactory, false);
  streamingContext.checkpoint(checkpointDirString);

yields:

2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66
org.apache.hadoop.conf.Configuration

- field (class 
org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9,
name: conf$2, type: class org.apache.hadoop.conf.Configuration)

- object (class
org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9,
function2)

- field (class org.apache.spark.streaming.dstream.ForEachDStream,
name: org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc,
type: interface scala.Function2)

- object (class org.apache.spark.streaming.dstream.ForEachDStream,
org.apache.spark.streaming.dstream.ForEachDStream@cb8016a)
...


This looks like it's due to PairRDDFunctions, as this saveFunc seems
to be org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9
:

def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ : NewOutputFormat[_, _]],
conf: Configuration = new Configuration
  ) {
  val saveFunc = (rdd: RDD[(K, V)], time: Time) = {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass,
outputFormatClass, conf)
  }
  self.foreachRDD(saveFunc)
}


conf indeed is serialized to make it part of saveFunc, no? but it
can't be serialized.
But surely this doesn't fail all the time or someone would have
noticed by now...

It could be a particular closure problem again.

Any ideas on whether this is a problem, or if there's a workaround?
checkpointing does not work at all for me as a result.

On Fri, Aug 15, 2014 at 10:37 PM, salemi alireza.sal...@udo.edu wrote:
 Hi All,

 I am just trying to save the kafka dstream to hadoop as followed

   val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
   dStream.saveAsHadoopFiles(hdfsDataUrl, data)

 It throws the following exception. What am I doing wrong?

 14/08/15 14:30:09 ERROR OneForOneStrategy: org.apache.hadoop.mapred.JobConf
 java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
 at
 org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
   

Re: spark streaming - saving kafka DStream into hadoop throws exception

2014-08-15 Thread Sean Owen
Somewhere, your function has a reference to the Hadoop JobConf object
and is trying to send that to the workers. It's not in this code you
pasted so must be from something slightly different?

It shouldn't need to send that around and in fact it can't be
serialized as you see. If you need a Hadoop Configuration object, you
can get that from SparkContext, which you can get from the
StreamingContext.

On Fri, Aug 15, 2014 at 9:37 PM, salemi alireza.sal...@udo.edu wrote:
 Hi All,

 I am just trying to save the kafka dstream to hadoop as followed

   val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
   dStream.saveAsHadoopFiles(hdfsDataUrl, data)

 It throws the following exception. What am I doing wrong?

 14/08/15 14:30:09 ERROR OneForOneStrategy: org.apache.hadoop.mapred.JobConf
 java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
 at
 org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at
 org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
 at
 org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
 at
 org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
 at
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 ^C14/08/15 14:30:10 ERROR OneForOneStrategy:
 org.apache.hadoop.mapred.JobConf
 java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 

Re: spark streaming - saving kafka DStream into hadoop throws exception

2014-08-15 Thread salemi
Look this is the whole program. I am not trying to serialize the JobConf.
 
def main(args: Array[String]) {
try {
  val properties = getProperties(settings.properties)
  StreamingExamples.setStreamingLogLevels()
  val zkQuorum =  properties.get(zookeeper.list).toString()
  val topic = properties.get(topic.name).toString()
  val group = properties.get(group.name).toString()
  val threads = properties.get(consumer.threads).toString()
  val topicpMap = Map(topic - threads.toInt)
  val hdfsNameNodeUrl = properties.get(hdfs.namenode.url).toString()
  val hdfsCheckPointUrl = hdfsNameNodeUrl +
properties.get(hdfs.checkpoint.path).toString()
  val hdfsDataUrl = hdfsNameNodeUrl +
properties.get(hdfs.data.path).toString()
  val checkPointInterval =
properties.get(spark.streaming.checkpoint.interval).toString().toInt
  val sparkConf = new SparkConf().setAppName(KafkaMessageReceiver)
  println(===)
  println(kafka configuration: zk: + zkQuorum + ; topic: + topic +
; group: + group +  ; threads: + threads)
  println(===)
  val ssc = new StreamingContext(sparkConf, Seconds(1))
  ssc.checkpoint(hdfsCheckPointUrl)
  val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
  dStream.checkpoint(Seconds(checkPointInterval))
  dStream.saveAsNewAPIHadoopFiles(hdfsDataUrl, csv, classOf[String],
classOf[String], classOf[TextOutputFormat[String,String]],
ssc.sparkContext.hadoopConfiguration)
  
  val eventData = dStream.map(_._2).map(_.split(,)).map(data =
DataObject(data(0), data(1), data(2), data(3), data(4), data(5), data(6),
data(7), data(8).toLong, data(9), data(10), data(11), data(12).toLong,
data(13), data(14)))
  val count = eventData.filter(_.state ==
COMPLETE).countByWindow(Minutes(15), Seconds(1))
  count.map(cnt = the Total count of calls in complete state  in the
last 15 minutes is:  + cnt).print()
  ssc.start()
  ssc.awaitTermination()

} catch {
  case e: Exception = println(exception caught:  + e);
}
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202p12207.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming - saving kafka DStream into hadoop throws exception

2014-08-15 Thread salemi
if I reduce the app to the following code then I don't see the exception. It
creates the hadoop files but they are empty! The DStream doesn't get written
out to the files!

 def main(args: Array[String]) {
try {
  val properties = getProperties(settings.properties)
  StreamingExamples.setStreamingLogLevels()
  val zkQuorum =  properties.get(zookeeper.list).toString()
  val topic = properties.get(topic.name).toString()
  val group = properties.get(group.name).toString()
  val threads = properties.get(consumer.threads).toString()
  val topicpMap = Map(topic - threads.toInt)
  val hdfsNameNodeUrl = properties.get(hdfs.namenode.url).toString()
  val hdfsCheckPointUrl = hdfsNameNodeUrl +
properties.get(hdfs.checkpoint.path).toString()
  val hdfsDataUrl = hdfsNameNodeUrl +
properties.get(hdfs.data.path).toString()
  val checkPointInterval =
properties.get(spark.streaming.checkpoint.interval).toString().toInt
  val sparkConf = new SparkConf().setAppName(KafkaMessageReceiver)
  println(===)
  println(kafka configuration: zk: + zkQuorum + ; topic: + topic +
; group: + group +  ; threads: + threads)
  println(===)
  val ssc = new StreamingContext(sparkConf, Seconds(1))
  val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
  dStream.saveAsNewAPIHadoopFiles(hdfsDataUrl, csv, classOf[String],
classOf[String], classOf[TextOutputFormat[String,String]],
ssc.sparkContext.hadoopConfiguration)

  ssc.start()
  ssc.awaitTermination()

} catch {
  case e: Exception = println(exception caught:  + e);
}
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202p12213.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org