[ 
https://issues.apache.org/jira/browse/SPARK-11693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15004072#comment-15004072
 ] 

Cody Koeninger commented on SPARK-11693:
----------------------------------------

You've under-provisioned Kafka storage and / or Spark compute capacity.
The result is that data is being deleted before it has been processed.
I personally think the proper response to a system being broken is for it to 
obviously break in a noticeable way, rather than silently giving the wrong 
result.
My recommended way to handle this would be to monitor your stream, and have a 
restart policy that's appropriate for your situation.

If you want to modify the area of the code you noted to silently catch the 
exception and start at the next available offset, you can do so pretty 
straightforwardly (streaming-kafka is an external module so you shouldn't have 
to re-deploy all of spark).  I don't think that's a modification that makes 
sense for the general use case however.

> spark kafka direct streaming exception
> --------------------------------------
>
>                 Key: SPARK-11693
>                 URL: https://issues.apache.org/jira/browse/SPARK-11693
>             Project: Spark
>          Issue Type: Question
>          Components: Streaming
>    Affects Versions: 1.5.1
>            Reporter: xiaoxiaoluo
>            Priority: Minor
>
> We are using spark kafka direct streaming in our test enviroment. We have 
> limited the kafka partition size to avoid to exhaust the disk space.So when 
> the speed of data writing to kafka faster than the speed of spark streaming 
> reading data. There will be some exception in spark streaming, and the 
> application will be shut down.
> {noformat}
> 15/11/11 10:17:35 ERROR Executor: Exception in task 0.3 in stage 1626659.0 
> (TID 1134180)
> kafka.common.OffsetOutOfRangeException
>       at sun.reflect.GeneratedConstructorAccessor32.newInstance(Unknown 
> Source)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>       at java.lang.Class.newInstance(Class.java:442)
>       at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>       at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
>       at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
>       at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
>       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>       at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 15/11/11 10:17:42 ERROR CoarseGrainedExecutorBackend: Driver 10.1.92.44:49939 
> disassociated! Shutting down.
> {noformat}
> Could streaming get the current smallest offset from this partition? and go 
> on to process streaming data?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to