[jira] [Updated] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated SPARK-20050: Description: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart, such below * output first run of this application {code} key: null value: 1 offset: 101452472 key: null value: 2 offset: 101452473 key: null value: 3 offset: 101452474 key: null value: 4 offset: 101452475 key: null value: 5 offset: 101452476 key: null value: 6 offset: 101452477 key: null value: 7 offset: 101452478 key: null value: 8 offset: 101452479 key: null value: 9 offset: 101452480 // this is a last record before shutdown Spark Streaming gracefully {\code} * output re-run of this application {code} key: null value: 7 offset: 101452478 // duplication key: null value: 8 offset: 101452479 // duplication key: null value: 9 offset: 101452480 // duplication key: null value: 10 offset: 101452481 {\code} It may cause offsets specified in commitAsync will commit in the head of next batch. was: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart, such below * output first run of this application {code} key: null value: 1 offset: 101452472 key: null value: 2 offset: 101452473 key: null value: 3 offset: 101452474 key: null value: 4 offset: 101452475 key: null value: 5 offset: 101452476 key: null value: 6 offset: 101452477 key: null value: 7 offset: 101452478 key: null value: 8 offset: 101452479 key: null value: 9 offset: 101452480 {\code} * output re-run of this application {code} key: null value: 7 offset: 101452478 // duplication key: null value: 8 offset: 101452479 // duplication key: null value: 9 offset: 101452480 // duplication key: null value: 10 offset: 101452481 {\code} It may cause offsets specified in commitAsync will commit in the head of next batch. > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart, such > below > * output first run of this application > {code} > key: null value: 1 offset: 101452472 > key: null value: 2 offset: 101452473 > key: null value: 3 offset: 101452474 > key: null value: 4 offset: 101452475 > key: null value: 5 offset: 101452476 > key: null value: 6 offset: 101452477 > key: null value: 7 offset: 101452478 > key: null value: 8 offset: 101452479 > key: null value: 9 offset: 101452480 //
[jira] [Updated] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated SPARK-20050: Description: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart, such below * output first run of this application {code} key: null value: 1 offset: 101452472 key: null value: 2 offset: 101452473 key: null value: 3 offset: 101452474 key: null value: 4 offset: 101452475 key: null value: 5 offset: 101452476 key: null value: 6 offset: 101452477 key: null value: 7 offset: 101452478 key: null value: 8 offset: 101452479 key: null value: 9 offset: 101452480 {\code} * output re-run of this application {code} key: null value: 7 offset: 101452478 // duplication key: null value: 8 offset: 101452479 // duplication key: null value: 9 offset: 101452480 // duplication key: null value: 10 offset: 101452481 {\code} It may cause offsets specified in commitAsync will commit in the head of next batch. was: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart. It may cause offsets specified in commitAsync will commit in the head of next batch. > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart, such > below > * output first run of this application > {code} > key: null value: 1 offset: 101452472 > key: null value: 2 offset: 101452473 > key: null value: 3 offset: 101452474 > key: null value: 4 offset: 101452475 > key: null value: 5 offset: 101452476 > key: null value: 6 offset: 101452477 > key: null value: 7 offset: 101452478 > key: null value: 8 offset: 101452479 > key: null value: 9 offset: 101452480 > {\code} > * output re-run of this application > {code} > key: null value: 7 offset: 101452478 // duplication > key: null value: 8 offset: 101452479 // duplication > key: null value: 9 offset: 101452480 // duplication > key: null value: 10 offset: 101452481 > {\code} > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated SPARK-20050: Description: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart. It may cause offsets specified in commitAsync will commit in the head of next batch. was: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart. It may cause offsets specified in commitAsync will commit in the head of next batch. Issue Type: Bug (was: Improvement) > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart. > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org