[ https://issues.apache.org/jira/browse/SPARK-11211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971834#comment-14971834 ]
Cody Koeninger commented on SPARK-11211: ---------------------------------------- In an attempt to replicate, I used the following: - spark 1.5.1 - a kafka broker with pathologically small retention and frequent log check / deletion log.retention.bytes=2048 log.segment.bytes=1024 log.retention.check.interval.ms=30000 - the IdempotentExample job from https://github.com/koeninger/kafka-exactly-once, which has a batch size of 60 seconds - simple 1 msg / sec sent from the console producer, e.g. for i in {1..10000}; do echo "`date +%s`"; sleep 1; done | bin/kafka-console-producer.sh --broker-list 'localhost:9092' --topic test This results in (at least the first batch) failing as I'd expect, since kafka messages are being deleted before they can be processed. I can see the failed batch in the master ui, and there actually is an exception visible on the driver (although it's a classNotFound exception caused by deserializing the OffsetOutOfRangeException; there's a separate ticket SPARK-11195 related to that). This is what I see in the logs on the executor: 15/10/23 15:16:00 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 15/10/23 15:16:00 INFO KafkaRDD: Computing topic test, partition 0 offsets 4727 -> 4867 15/10/23 15:16:00 INFO KafkaRDD: Computing topic test, partition 1 offsets 2900 -> 2973 15/10/23 15:16:00 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) kafka.common.OffsetOutOfRangeException // (repeats total of 4 times, then that job fails) 15/10/23 15:20:31 INFO KafkaRDD: Computing topic test, partition 0 offsets 4867 -> 4926 15/10/23 15:20:31 INFO KafkaRDD: Beginning offset 2973 is the same as ending offset skipping test 1 You'll notice there isn't any gap in offsets between the batch that failed and the next batch. It's not that the consumer is suddenly skipping to the largest value on the kafka topic, it's that the first batch failed. This is a situation where the underlying assumption (ie that you have enough kafka retention) has gone wrong, so the best thing to do in my opinion is fail. I'm not sure exactly what you'd expect to happen in this case. Silently submitting another batch that duplicates some smaller portion of the first batch seems like a really bad idea. I'd like to know when I've lost data, and I'd like separate batches to contain strictly separate groups of messages. If you want to reduce the scope of the number of messages that can be lost in this kind of situation, use maxMessagesPerPartition and / or smaller batch sizes. But in any case, you've lost data because Kafka deleted it. Changing spark behavior won't get you your data back. If I'm misunderstanding the nature of the problem, please report the version of spark you're using, an actual observation of behavior (e.g. log output), and the behavior you expect? What you're reporting is more of a (probably inaccurate) theory as to the root cause, as opposed to observed behavior. > Kafka - offsetOutOfRange forces to largest > ------------------------------------------ > > Key: SPARK-11211 > URL: https://issues.apache.org/jira/browse/SPARK-11211 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.3.1, 1.5.1 > Reporter: Daniel Strassler > > This problem relates to how DStreams using the Direct Approach of connecting > to a Kafka topic behave when they request an offset that does not exist on > the topic. Currently it appears the "auto.offset.reset" configuration value > is being ignored and the default value of “largest” is always being used. > > When using the Direct Approach of connecting to a Kafka topic using a > DStream, even if you have the Kafka configuration "auto.offset.reset" set to > smallest, the behavior in the event of a > kafka.common.OffsetOutOfRangeException exception is to move the next offset > to be consumed value to the largest value on the Kafka topic. It appears > that the exception is being eaten and not propagated up to the driver as > well, so a work around triggered by the propagation of the error can not be > implemented either. > > The current behavior of setting to largest means that any data on the Kafka > topic at the time of the exception being thrown is skipped(lost) to > consumption and only data produced to the topic after the exception will be > consumed. Two possible fixes are listed below. > > Fix 1: When “auto.offset.reset" is set to “smallest”, the DStream should set > the next consumed offset to be the smallest offset value on the Kafka topic. > > Fix 2: Propagate the error to the Driver to allow it to react as it deems > appropriate. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org