[ 
https://issues.apache.org/jira/browse/SPARK-11211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971834#comment-14971834
 ] 

Cody Koeninger commented on SPARK-11211:
----------------------------------------

In an attempt to replicate, I used the following:
- spark 1.5.1
- a kafka broker with pathologically small retention and frequent log check / 
deletion
log.retention.bytes=2048
log.segment.bytes=1024
log.retention.check.interval.ms=30000
- the IdempotentExample job from 
https://github.com/koeninger/kafka-exactly-once, which has a batch size of 60 
seconds
- simple 1 msg / sec sent from the console producer, e.g.
for i in {1..10000}; do echo "`date +%s`"; sleep 1; done |  
bin/kafka-console-producer.sh --broker-list 'localhost:9092' --topic test


This results in (at least the first batch) failing as I'd expect, since kafka 
messages are being deleted before they can be processed. I can see the failed 
batch in the master ui, and there actually is an exception visible on the 
driver (although it's a classNotFound exception caused by deserializing the 
OffsetOutOfRangeException; there's a separate ticket  SPARK-11195 related to 
that).  

This is what I see in the logs on the executor:

15/10/23 15:16:00 INFO VerifiableProperties: Property auto.offset.reset is 
overridden to smallest
15/10/23 15:16:00 INFO KafkaRDD: Computing topic test, partition 0 offsets 4727 
-> 4867
15/10/23 15:16:00 INFO KafkaRDD: Computing topic test, partition 1 offsets 2900 
-> 2973
15/10/23 15:16:00 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
kafka.common.OffsetOutOfRangeException

// (repeats total of 4 times, then that job fails)

15/10/23 15:20:31 INFO KafkaRDD: Computing topic test, partition 0 offsets 4867 
-> 4926
15/10/23 15:20:31 INFO KafkaRDD: Beginning offset 2973 is the same as ending 
offset skipping test 1

You'll notice there isn't any gap in offsets between the batch that failed and 
the next batch.  It's not that the consumer is suddenly skipping to the largest 
value on the kafka topic, it's that the first batch failed.

This is a situation where the underlying assumption (ie that you have enough 
kafka retention) has gone wrong, so the best thing to do in my opinion is fail.

I'm not sure exactly what you'd expect to happen in this case.  Silently 
submitting another batch that duplicates some smaller portion of the first 
batch seems like a really bad idea. I'd like to know when I've lost data, and 
I'd like separate batches to contain strictly separate groups of messages.  

If you want to reduce the scope of the number of messages that can be lost in 
this kind of situation, use maxMessagesPerPartition and / or smaller batch 
sizes.  But in any case, you've lost data because Kafka deleted it.  Changing 
spark behavior won't get you your data back.

If I'm misunderstanding the nature of the problem, please report the version of 
spark you're using, an actual observation of behavior (e.g. log output), and 
the behavior you expect?  What you're reporting is more of a (probably 
inaccurate) theory as to the root cause, as opposed to observed behavior.

> Kafka - offsetOutOfRange forces to largest
> ------------------------------------------
>
>                 Key: SPARK-11211
>                 URL: https://issues.apache.org/jira/browse/SPARK-11211
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.1, 1.5.1
>            Reporter: Daniel Strassler
>
> This problem relates to how DStreams using the Direct Approach of connecting 
> to a Kafka topic behave when they request an offset that does not exist on 
> the topic.  Currently it appears the "auto.offset.reset" configuration value 
> is being ignored and the default value of “largest” is always being used.
>  
> When using the Direct Approach of connecting to a Kafka topic using a 
> DStream, even if you have the Kafka configuration "auto.offset.reset" set to 
> smallest, the behavior in the event of a 
> kafka.common.OffsetOutOfRangeException exception is to move the next offset 
> to be consumed value to the largest value on the Kafka topic.  It appears 
> that the exception is being eaten and not propagated up to the driver as 
> well, so a work around triggered by the propagation of the error can not be 
> implemented either.
>  
> The current behavior of setting to largest means that any data on the Kafka 
> topic at the time of the exception being thrown is skipped(lost) to 
> consumption and only data produced to the topic after the exception will be 
> consumed.  Two possible fixes are listed below.
>  
> Fix 1:  When “auto.offset.reset" is set to “smallest”, the DStream should set 
> the next consumed offset to be the smallest offset value on the Kafka topic.
>  
> Fix 2:  Propagate the error to the Driver to allow it to react as it deems 
> appropriate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to