Hey,

KafkaUtils.createDirectStream doesn't need a StorageLevel as it doesn't
store blocks to BlockManager. However, the error is not related
to StorageLevel. It may be a bug. Could you provide more info about it?
E.g., Spark version, your codes, logs.

On Wed, Mar 2, 2016 at 3:02 AM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Hi All,
>
> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
> program as currently i am getting
> MetadataFetchFailedException*. *I am not sure where i should pass
> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
> allow to pass that parameter.
>
>
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](
>   ssc, kafkaParams, topicsSet)
>
>
> Full Error:
>
> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0*
>     at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
>     at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
>     at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>     at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>     at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>     at
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
>     at
> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
>     at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
>     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>     at org.apache.spark.scheduler.Task.run(Task.scala:88)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
>
> )
>
> Thanks,
> ~Vinti
>

Reply via email to