[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276892#comment-16276892 ]
Sasaki Toru edited comment on SPARK-20050 at 12/4/17 2:54 PM: -------------------------------------------------------------- Thank you comment. I think this patch can be backported to branch-2.1 and will fix same issue in version 2.1. was (Author: sasakitoa): Thank you comment. I think this patch can be backported to branch-2.1 and will fix same issue. > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > ------------------------------------------------------------------------------------------- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.2.0 > Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart, such > below > * output first run of this application > {code} > key: null value: 1 offset: 101452472 > key: null value: 2 offset: 101452473 > key: null value: 3 offset: 101452474 > key: null value: 4 offset: 101452475 > key: null value: 5 offset: 101452476 > key: null value: 6 offset: 101452477 > key: null value: 7 offset: 101452478 > key: null value: 8 offset: 101452479 > key: null value: 9 offset: 101452480 // this is a last record before > shutdown Spark Streaming gracefully > {\code} > * output re-run of this application > {code} > key: null value: 7 offset: 101452478 // duplication > key: null value: 8 offset: 101452479 // duplication > key: null value: 9 offset: 101452480 // duplication > key: null value: 10 offset: 101452481 > {\code} > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org