[ 
https://issues.apache.org/jira/browse/SPARK-22012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Karan Singh updated SPARK-22012:
--------------------------------
    Description: 
My Spark Streaming duration is 5 seconds (5000) and kafka is all at its default 
properties , i am still facing this issue , Can anyone tell me how to resolve 
it what i am doing wrong ?
                        JavaStreamingContext ssc = new JavaStreamingContext(sc, 
new Duration(5000));


Exception in Spark Streamings
Job aborted due to stage failure: Task 6 in stage 289.0 failed 4 times, most 
recent failure: Lost task 6.3 in stage 289.0 (xxxxxx, executor 2): 
java.lang.AssertionError: assertion failed: Failed to get records for 
spark-executor-xxx pulse 1 163684030 after polling for 512
        at scala.Predef$.assert(Predef.scala:170)
        at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
        at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
        at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
        at 
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
        at 
com.olacabs.analytics.engine.EngineManager$1$1.call(EngineManager.java:165)
        at 
com.olacabs.analytics.engine.EngineManager$1$1.call(EngineManager.java:132)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

  was:
We have a Spark Streaming application reading records from Kafka 0.10.

Some tasks are failed because of the following error:
"java.lang.AssertionError: assertion failed: Failed to get records for (...) 
after polling for 512"

The first attempt fails and the second attempt (retry) completes successfully, 
- this is the pattern that we see for many tasks in our logs. These fails and 
retries consume resources.

A similar case with a stack trace are described here:
https://www.mail-archive.com/user@spark.apache.org/msg56564.html
https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767

Here is the line from the stack trace where the error is raised:
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)

We tried several values for "spark.streaming.kafka.consumer.poll.ms", - 2, 5, 
10, 30 and 60 seconds, but the error appeared in all the cases except the last 
one. Moreover, increasing the threshold led to increasing total Spark stage 
duration.
In other words, increasing "spark.streaming.kafka.consumer.poll.ms" led to 
fewer task failures but with cost of total stage duration. So, it is bad for 
performance when processing data streams.

We have a suspicion that there is a bug in CachedKafkaConsumer (and/or other 
related classes) which inhibits the reading process.



> CLONE - Spark Streaming, Kafka receiver, "Failed to get records for ... after 
> polling for 512"
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22012
>                 URL: https://issues.apache.org/jira/browse/SPARK-22012
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.0
>         Environment: Apache Spark 2.0.0, Kafka 0.10 for Scala 2.11
>            Reporter: Karan Singh
>
> My Spark Streaming duration is 5 seconds (5000) and kafka is all at its 
> default properties , i am still facing this issue , Can anyone tell me how to 
> resolve it what i am doing wrong ?
>                       JavaStreamingContext ssc = new JavaStreamingContext(sc, 
> new Duration(5000));
> Exception in Spark Streamings
> Job aborted due to stage failure: Task 6 in stage 289.0 failed 4 times, most 
> recent failure: Lost task 6.3 in stage 289.0 (xxxxxx, executor 2): 
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-xxx pulse 1 163684030 after polling for 512
>       at scala.Predef$.assert(Predef.scala:170)
>       at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>       at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>       at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>       at 
> scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
>       at 
> com.olacabs.analytics.engine.EngineManager$1$1.call(EngineManager.java:165)
>       at 
> com.olacabs.analytics.engine.EngineManager$1$1.call(EngineManager.java:132)
>       at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
>       at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:99)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to