I am new to the spark. I am trying to implement the spark streaming from
the kafka topic.

It worked fine for some time. but some time later it started throwing the
below error. I am not getting any clue what causing the issues.

java.lang.Exception: Could not compute split, block input-0-1412045981400
not found
    at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)


here is my code

JavaStreamingContext jsc =
AnalyticsContext.getInstance().getSparkStreamContext();
                    Map<String, Integer> topicMap = new HashMap<String,
Integer>();
                    topicMap.put(topic, noOfWorkerThreads);
                    Map<String, String> kafkaParams =
MessageSessionFactory.getConsumerConfigParamsMap(MessageSessionFactory.DEFAULT_CLUSTER_IDENTITY,
consumerGroup);
                    metricStream = KafkaUtils.createStream(jsc,
String.class, String.class, ObjectDecoder.class, ObjectDecoder.class,
kafkaParams, topicMap, StorageLevel.MEMORY_ONLY());

Reply via email to