[ 
https://issues.apache.org/jira/browse/SPARK-12693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089306#comment-15089306
 ] 

Cody Koeninger commented on SPARK-12693:
----------------------------------------

If an executor is given a range of kakfa offsets that no longer exist in kafka, 
it's going to error.

It doesn't matter whether this is caused by you setting kafka retention 
parameters to several orders of magnitude smaller than default, or deleting 
offsets from kafka between the time when the driver created an rdd and the 
executor started to compute it, or trying to restart from a checkpoint that has 
old offsets, or manually creating an RDD for offsets that don't exist, or 
whatever else.  If the data doesn't exist any more, the only reasonable thing 
to do is error.  You don't really need to experiment with all the different 
ways you can cause this to happen, the documentation pretty explicitly says you 
need adequate kafka retention.

Regarding your most recent comment, as the documentation says, 'If __not__ 
starting from a checkpoint, "auto.offset.reset" may be set to "largest" or 
"smallest" to determine where the stream starts'.  You're starting from a 
checkpoint, you're already specifying where you want the stream to start from, 
and if those offsets no longer exist, it's not going to work.

As I said, changing this would impact correctness (automatically retrying with 
different offsets would mean silently losing data) and performance (the count 
optimizations assume that the messages specified in the offset range are 
actually the messages that will be processed), so I don't see this being likely 
to change.  It's your job to make sure kafka has the data you're asking for.



> OffsetOutOfRangeException caused by retention
> ---------------------------------------------
>
>                 Key: SPARK-12693
>                 URL: https://issues.apache.org/jira/browse/SPARK-12693
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.0
>         Environment: Ubuntu 64bit, Intel i7
>            Reporter: Rado Buransky
>            Priority: Minor
>              Labels: kafka
>         Attachments: kafka-log.txt, log.txt
>
>
> I am running Kafka server locally with extremely low retention of 3 seconds 
> and with 1 second segmentation. I create direct Kafka stream with 
> auto.offset.reset = smallest. 
> In case of bad luck (happens actually quite often in my case) the smallest 
> offset retrieved druing stream initialization doesn't already exists when 
> streaming actually starts.
> Complete source code of the Spark Streaming application is here:
> https://github.com/pygmalios/spark-checkpoint-experience/blob/cb27ab83b7a29e619386b56e68a755d7bd73fc46/src/main/scala/com/pygmalios/sparkCheckpointExperience/spark/SparkApp.scala
> The application ends in an endless loop trying to get that non-existing 
> offset and has to be killed. Check attached logs from Spark and also from 
> Kafka server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to