Thanks Shixiong. Sure. Please find the details: Spark-version: 1.5.2 I am doing data aggregation using check pointing, not sure if this is causing issue. Also, i am using perl_kafka producer to push data to kafka and then my spark program is reading it from kafka. Not sure, if i need to use createStream function instead of createDirectStream?
My program: def main(args: Array[String]): Unit = { val checkpointDirectory = "hdfs://<hostname>:8020/user/spark/" + args(2) // Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("HBaseStream") val sc = new SparkContext(conf) // create a StreamingContext, the main entry point for all streaming functionality val ssc = new StreamingContext(sc, Seconds(1)) val brokers = args(0) val topics= args(1) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest") val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) // parse the lines of data into coverage objects val inputStream = messages.map(_._2) ssc.checkpoint(checkpointDirectory) inputStream.print(1) val parsedStream = inputStream .map(line => { val splitLines = line.split(",") (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong))) }) import breeze.linalg.{DenseVector => BDV} import scala.util.Try val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey( (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { prev.map(_ +: current).orElse(Some(current)) .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) }) state.checkpoint(Duration(10000)) state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) ssc } val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) context.start() context.awaitTermination() } } Thanks & Regards, Vinti Thanks & Regards, Vinti On Wed, Mar 2, 2016 at 10:28 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com > wrote: > Hey, > > KafkaUtils.createDirectStream doesn't need a StorageLevel as it doesn't > store blocks to BlockManager. However, the error is not related > to StorageLevel. It may be a bug. Could you provide more info about it? > E.g., Spark version, your codes, logs. > > On Wed, Mar 2, 2016 at 3:02 AM, Vinti Maheshwari <vinti.u...@gmail.com> > wrote: > >> Hi All, >> >> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming >> program as currently i am getting >> MetadataFetchFailedException*. *I am not sure where i should pass >> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream >> doesn't allow to pass that parameter. >> >> >> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, >> StringDecoder]( >> ssc, kafkaParams, topicsSet) >> >> >> Full Error: >> >> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output >> location for shuffle 0* >> at >> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460) >> at >> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456) >> at >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >> at >> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456) >> at >> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183) >> at >> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47) >> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> ) >> >> Thanks, >> ~Vinti >> > >