[jira] [Resolved] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru resolved SPARK-20050. - Resolution: Not A Problem > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru >Priority: Major > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart, such > below > * output first run of this application > {code} > key: null value: 1 offset: 101452472 > key: null value: 2 offset: 101452473 > key: null value: 3 offset: 101452474 > key: null value: 4 offset: 101452475 > key: null value: 5 offset: 101452476 > key: null value: 6 offset: 101452477 > key: null value: 7 offset: 101452478 > key: null value: 8 offset: 101452479 > key: null value: 9 offset: 101452480 // this is a last record before > shutdown Spark Streaming gracefully > {\code} > * output re-run of this application > {code} > key: null value: 7 offset: 101452478 // duplication > key: null value: 8 offset: 101452479 // duplication > key: null value: 9 offset: 101452480 // duplication > key: null value: 10 offset: 101452481 > {\code} > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16276892#comment-16276892 ] Sasaki Toru commented on SPARK-20050: - Thank you comment. I think this patch can be backported to branch-2.1 and will fix same issue. > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart, such > below > * output first run of this application > {code} > key: null value: 1 offset: 101452472 > key: null value: 2 offset: 101452473 > key: null value: 3 offset: 101452474 > key: null value: 4 offset: 101452475 > key: null value: 5 offset: 101452476 > key: null value: 6 offset: 101452477 > key: null value: 7 offset: 101452478 > key: null value: 8 offset: 101452479 > key: null value: 9 offset: 101452480 // this is a last record before > shutdown Spark Streaming gracefully > {\code} > * output re-run of this application > {code} > key: null value: 7 offset: 101452478 // duplication > key: null value: 8 offset: 101452479 // duplication > key: null value: 9 offset: 101452480 // duplication > key: null value: 10 offset: 101452481 > {\code} > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16276892#comment-16276892 ] Sasaki Toru edited comment on SPARK-20050 at 12/4/17 2:54 PM: -- Thank you comment. I think this patch can be backported to branch-2.1 and will fix same issue in version 2.1. was (Author: sasakitoa): Thank you comment. I think this patch can be backported to branch-2.1 and will fix same issue. > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart, such > below > * output first run of this application > {code} > key: null value: 1 offset: 101452472 > key: null value: 2 offset: 101452473 > key: null value: 3 offset: 101452474 > key: null value: 4 offset: 101452475 > key: null value: 5 offset: 101452476 > key: null value: 6 offset: 101452477 > key: null value: 7 offset: 101452478 > key: null value: 8 offset: 101452479 > key: null value: 9 offset: 101452480 // this is a last record before > shutdown Spark Streaming gracefully > {\code} > * output re-run of this application > {code} > key: null value: 7 offset: 101452478 // duplication > key: null value: 8 offset: 101452479 // duplication > key: null value: 9 offset: 101452480 // duplication > key: null value: 10 offset: 101452481 > {\code} > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated SPARK-20050: Description: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart, such below * output first run of this application {code} key: null value: 1 offset: 101452472 key: null value: 2 offset: 101452473 key: null value: 3 offset: 101452474 key: null value: 4 offset: 101452475 key: null value: 5 offset: 101452476 key: null value: 6 offset: 101452477 key: null value: 7 offset: 101452478 key: null value: 8 offset: 101452479 key: null value: 9 offset: 101452480 // this is a last record before shutdown Spark Streaming gracefully {\code} * output re-run of this application {code} key: null value: 7 offset: 101452478 // duplication key: null value: 8 offset: 101452479 // duplication key: null value: 9 offset: 101452480 // duplication key: null value: 10 offset: 101452481 {\code} It may cause offsets specified in commitAsync will commit in the head of next batch. was: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart, such below * output first run of this application {code} key: null value: 1 offset: 101452472 key: null value: 2 offset: 101452473 key: null value: 3 offset: 101452474 key: null value: 4 offset: 101452475 key: null value: 5 offset: 101452476 key: null value: 6 offset: 101452477 key: null value: 7 offset: 101452478 key: null value: 8 offset: 101452479 key: null value: 9 offset: 101452480 {\code} * output re-run of this application {code} key: null value: 7 offset: 101452478 // duplication key: null value: 8 offset: 101452479 // duplication key: null value: 9 offset: 101452480 // duplication key: null value: 10 offset: 101452481 {\code} It may cause offsets specified in commitAsync will commit in the head of next batch. > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart, such > below > * output first run of this application > {code} > key: null value: 1 offset: 101452472 > key: null value: 2 offset: 101452473 > key: null value: 3 offset: 101452474 > key: null value: 4 offset: 101452475 > key: null value: 5 offset: 101452476 > key: null value: 6 offset: 101452477 > key: null value: 7 offset: 101452478 > key: null value: 8 offset: 101452479 > key: null value: 9 offset: 101452480 //
[jira] [Updated] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated SPARK-20050: Description: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart, such below * output first run of this application {code} key: null value: 1 offset: 101452472 key: null value: 2 offset: 101452473 key: null value: 3 offset: 101452474 key: null value: 4 offset: 101452475 key: null value: 5 offset: 101452476 key: null value: 6 offset: 101452477 key: null value: 7 offset: 101452478 key: null value: 8 offset: 101452479 key: null value: 9 offset: 101452480 {\code} * output re-run of this application {code} key: null value: 7 offset: 101452478 // duplication key: null value: 8 offset: 101452479 // duplication key: null value: 9 offset: 101452480 // duplication key: null value: 10 offset: 101452481 {\code} It may cause offsets specified in commitAsync will commit in the head of next batch. was: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart. It may cause offsets specified in commitAsync will commit in the head of next batch. > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart, such > below > * output first run of this application > {code} > key: null value: 1 offset: 101452472 > key: null value: 2 offset: 101452473 > key: null value: 3 offset: 101452474 > key: null value: 4 offset: 101452475 > key: null value: 5 offset: 101452476 > key: null value: 6 offset: 101452477 > key: null value: 7 offset: 101452478 > key: null value: 8 offset: 101452479 > key: null value: 9 offset: 101452480 > {\code} > * output re-run of this application > {code} > key: null value: 7 offset: 101452478 // duplication > key: null value: 8 offset: 101452479 // duplication > key: null value: 9 offset: 101452480 // duplication > key: null value: 10 offset: 101452481 > {\code} > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938988#comment-15938988 ] Sasaki Toru edited comment on SPARK-20050 at 3/23/17 6:45 PM: -- Thank you for your comment, but I can't understand your advice, sorry. I want to say some offset set in {{commitAsync}} will not commit to Kafka. I think callback function will be invoked when committed to Kafka completely (success or failed), so I think this function will not be invoked in this case. If I am wrong, please correct, thanks. was (Author: sasakitoa): Thank you for your comment, but I can't understand your advice, sorry. I want to say some offset set in {{commitAsync}} will not commit to Kafka. I think callback function will invoke when committed to Kafka completely (success or failed), so I think this function will not be invoked in this case. If I am wrong, please correct, thanks. > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart. > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938988#comment-15938988 ] Sasaki Toru commented on SPARK-20050: - Thank you for your comment, but I can't understand your advice, sorry. I want to say some offset set in {{commitAsync}} will not commit to Kafka. I think callback function will invoke when committed to Kafka completely (success or failed), so I think this function will not be invoked in this case. If I am wrong, please correct, thanks. > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart. > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20052) Some InputDStream needs closing processing after processing all batches when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935789#comment-15935789 ] Sasaki Toru commented on SPARK-20052: - My explain is not good, sorry. This ticket is related to SPARK-20050. In JobGenerate#stop, it will wait for finishing all batches after InputDStream#stop called when graceful shutdown is enable, but Kafka 0.10 DirectStream should commit offset after processing all batches. So I thought more process(I explained this "closing process") is needed after processing all batches. > Some InputDStream needs closing processing after processing all batches when > graceful shutdown > -- > > Key: SPARK-20052 > URL: https://issues.apache.org/jira/browse/SPARK-20052 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > Some class extend InputDStream needs closing processing after processing all > batches when graceful shutdown enabled. > (e.g. When using Kafka as data source, need to commit processed offsets to > Kafka Broker) > InputDStream has method 'stop' to stop receiving data, but this method will > be called before processing last batches generated for graceful shutdown. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20052) Some InputDStream needs closing processing after processing all batches when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated SPARK-20052: Summary: Some InputDStream needs closing processing after processing all batches when graceful shutdown (was: Some InputDStream needs closing processing after all batches processed when graceful shutdown) > Some InputDStream needs closing processing after processing all batches when > graceful shutdown > -- > > Key: SPARK-20052 > URL: https://issues.apache.org/jira/browse/SPARK-20052 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > Some class extend InputDStream needs closing processing after processing all > batches when graceful shutdown enabled. > (e.g. When using Kafka as data source, need to commit processed offsets to > Kafka Broker) > InputDStream has method 'stop' to stop receiving data, but this method will > be called before processing last batches generated for graceful shutdown. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20052) Some InputDStream needs closing processing after all batches processed when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated SPARK-20052: Description: Some class extend InputDStream needs closing processing after processing all batches when graceful shutdown enabled. (e.g. When using Kafka as data source, need to commit processed offsets to Kafka Broker) InputDStream has method 'stop' to stop receiving data, but this method will be called before processing last batches generated for graceful shutdown. was: Some class extend InputDStream needs closing processing after all batches processed when graceful shutdown enabled. (e.g. When using Kafka as data source, need to commit processed offsets to Kafka Broker) InputDStream has method 'stop' to stop receiving data, but this method will be called before processing last batches generated for graceful shutdown. > Some InputDStream needs closing processing after all batches processed when > graceful shutdown > - > > Key: SPARK-20052 > URL: https://issues.apache.org/jira/browse/SPARK-20052 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > Some class extend InputDStream needs closing processing after processing all > batches when graceful shutdown enabled. > (e.g. When using Kafka as data source, need to commit processed offsets to > Kafka Broker) > InputDStream has method 'stop' to stop receiving data, but this method will > be called before processing last batches generated for graceful shutdown. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20052) Some InputDStream needs closing processing after all batches processed when graceful shutdown
Sasaki Toru created SPARK-20052: --- Summary: Some InputDStream needs closing processing after all batches processed when graceful shutdown Key: SPARK-20052 URL: https://issues.apache.org/jira/browse/SPARK-20052 Project: Spark Issue Type: Improvement Components: DStreams Affects Versions: 2.2.0 Reporter: Sasaki Toru Some class extend InputDStream needs closing processing after all batches processed when graceful shutdown enabled. (e.g. When using Kafka as data source, need to commit processed offsets to Kafka Broker) InputDStream has method 'stop' to stop receiving data, but this method will be called before processing last batches generated for graceful shutdown. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated SPARK-20050: Description: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart. It may cause offsets specified in commitAsync will commit in the head of next batch. was: I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart. It may cause offsets specified in commitAsync will commit in the head of next batch. Issue Type: Bug (was: Improvement) > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > --- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.0 >Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart. > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
Sasaki Toru created SPARK-20050: --- Summary: Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown Key: SPARK-20050 URL: https://issues.apache.org/jira/browse/SPARK-20050 Project: Spark Issue Type: Improvement Components: DStreams Affects Versions: 2.2.0 Reporter: Sasaki Toru I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches such below {code} val kafkaStream = KafkaUtils.createDirectStream[String, String](...) kafkaStream.map { input => "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString }.foreachRDD { rdd => rdd.foreach { input => println(input) } } kafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } {\code} Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart. It may cause offsets specified in commitAsync will commit in the head of next batch. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13264) Remove multi-byte character in spark-env.sh.template
Sasaki Toru created SPARK-13264: --- Summary: Remove multi-byte character in spark-env.sh.template Key: SPARK-13264 URL: https://issues.apache.org/jira/browse/SPARK-13264 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.0.0 Reporter: Sasaki Toru Priority: Minor Fix For: 2.0.0 In spark-env.sh.template, there are multi-byte characters. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6736) Example of Graph#aggregateMessages has error
Sasaki Toru created SPARK-6736: -- Summary: Example of Graph#aggregateMessages has error Key: SPARK-6736 URL: https://issues.apache.org/jira/browse/SPARK-6736 Project: Spark Issue Type: Improvement Components: GraphX Affects Versions: 1.3.0 Reporter: Sasaki Toru Priority: Minor Fix For: 1.4.0 Example of Graph#aggregateMessages has error. Since aggregateMessages is a method of Graph, It should be written rawGraph.aggregateMessages -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6736) [GraphX]Example of Graph#aggregateMessages has error
[ https://issues.apache.org/jira/browse/SPARK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated SPARK-6736: --- Summary: [GraphX]Example of Graph#aggregateMessages has error (was: Example of Graph#aggregateMessages has error) [GraphX]Example of Graph#aggregateMessages has error Key: SPARK-6736 URL: https://issues.apache.org/jira/browse/SPARK-6736 Project: Spark Issue Type: Improvement Components: GraphX Affects Versions: 1.3.0 Reporter: Sasaki Toru Priority: Minor Fix For: 1.4.0 Example of Graph#aggregateMessages has error. Since aggregateMessages is a method of Graph, It should be written rawGraph.aggregateMessages -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6736) [GraphX]Example of Graph#aggregateMessages has error
[ https://issues.apache.org/jira/browse/SPARK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasaki Toru updated SPARK-6736: --- Component/s: Documentation [GraphX]Example of Graph#aggregateMessages has error Key: SPARK-6736 URL: https://issues.apache.org/jira/browse/SPARK-6736 Project: Spark Issue Type: Improvement Components: Documentation, GraphX Affects Versions: 1.3.0 Reporter: Sasaki Toru Priority: Minor Fix For: 1.4.0 Example of Graph#aggregateMessages has error. Since aggregateMessages is a method of Graph, It should be written rawGraph.aggregateMessages -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4742) The name of Parquet File generated by AppendingParquetOutputFormat should be zero padded
Sasaki Toru created SPARK-4742: -- Summary: The name of Parquet File generated by AppendingParquetOutputFormat should be zero padded Key: SPARK-4742 URL: https://issues.apache.org/jira/browse/SPARK-4742 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.2.0 Reporter: Sasaki Toru Priority: Minor When I use Parquet File as a output file using ParquetOutputFormat#getDefaultWorkFile, the file name is not zero padded while RDD#saveAsText does zero padding. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org