[jira] [Resolved] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown

2018-07-16 Thread Sasaki Toru (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru resolved SPARK-20050.
-
Resolution: Not A Problem

> Kafka 0.10 DirectStream doesn't commit last processed batch's offset when 
> graceful shutdown
> ---
>
> Key: SPARK-20050
> URL: https://issues.apache.org/jira/browse/SPARK-20050
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>Priority: Major
>
> I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
> call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such 
> below
> {code}
> val kafkaStream = KafkaUtils.createDirectStream[String, String](...)
> kafkaStream.map { input =>
>   "key: " + input.key.toString + " value: " + input.value.toString + " 
> offset: " + input.offset.toString
>   }.foreachRDD { rdd =>
> rdd.foreach { input =>
> println(input)
>   }
> }
> kafkaStream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
> {\code}
> Some records which processed in the last batch before Streaming graceful 
> shutdown reprocess in the first batch after Spark Streaming restart, such 
> below
> * output first run of this application
> {code}
> key: null value: 1 offset: 101452472
> key: null value: 2 offset: 101452473
> key: null value: 3 offset: 101452474
> key: null value: 4 offset: 101452475
> key: null value: 5 offset: 101452476
> key: null value: 6 offset: 101452477
> key: null value: 7 offset: 101452478
> key: null value: 8 offset: 101452479
> key: null value: 9 offset: 101452480  // this is a last record before 
> shutdown Spark Streaming gracefully
> {\code}
> * output re-run of this application
> {code}
> key: null value: 7 offset: 101452478   // duplication
> key: null value: 8 offset: 101452479   // duplication
> key: null value: 9 offset: 101452480   // duplication
> key: null value: 10 offset: 101452481
> {\code}
> It may cause offsets specified in commitAsync will commit in the head of next 
> batch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown

2017-12-04 Thread Sasaki Toru (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16276892#comment-16276892
 ] 

Sasaki Toru commented on SPARK-20050:
-

Thank you comment.
I think this patch can be backported to branch-2.1 and will fix same issue.

> Kafka 0.10 DirectStream doesn't commit last processed batch's offset when 
> graceful shutdown
> ---
>
> Key: SPARK-20050
> URL: https://issues.apache.org/jira/browse/SPARK-20050
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>
> I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
> call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such 
> below
> {code}
> val kafkaStream = KafkaUtils.createDirectStream[String, String](...)
> kafkaStream.map { input =>
>   "key: " + input.key.toString + " value: " + input.value.toString + " 
> offset: " + input.offset.toString
>   }.foreachRDD { rdd =>
> rdd.foreach { input =>
> println(input)
>   }
> }
> kafkaStream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
> {\code}
> Some records which processed in the last batch before Streaming graceful 
> shutdown reprocess in the first batch after Spark Streaming restart, such 
> below
> * output first run of this application
> {code}
> key: null value: 1 offset: 101452472
> key: null value: 2 offset: 101452473
> key: null value: 3 offset: 101452474
> key: null value: 4 offset: 101452475
> key: null value: 5 offset: 101452476
> key: null value: 6 offset: 101452477
> key: null value: 7 offset: 101452478
> key: null value: 8 offset: 101452479
> key: null value: 9 offset: 101452480  // this is a last record before 
> shutdown Spark Streaming gracefully
> {\code}
> * output re-run of this application
> {code}
> key: null value: 7 offset: 101452478   // duplication
> key: null value: 8 offset: 101452479   // duplication
> key: null value: 9 offset: 101452480   // duplication
> key: null value: 10 offset: 101452481
> {\code}
> It may cause offsets specified in commitAsync will commit in the head of next 
> batch.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown

2017-12-04 Thread Sasaki Toru (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16276892#comment-16276892
 ] 

Sasaki Toru edited comment on SPARK-20050 at 12/4/17 2:54 PM:
--

Thank you comment.
I think this patch can be backported to branch-2.1 and will fix same issue in 
version 2.1.


was (Author: sasakitoa):
Thank you comment.
I think this patch can be backported to branch-2.1 and will fix same issue.

> Kafka 0.10 DirectStream doesn't commit last processed batch's offset when 
> graceful shutdown
> ---
>
> Key: SPARK-20050
> URL: https://issues.apache.org/jira/browse/SPARK-20050
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>
> I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
> call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such 
> below
> {code}
> val kafkaStream = KafkaUtils.createDirectStream[String, String](...)
> kafkaStream.map { input =>
>   "key: " + input.key.toString + " value: " + input.value.toString + " 
> offset: " + input.offset.toString
>   }.foreachRDD { rdd =>
> rdd.foreach { input =>
> println(input)
>   }
> }
> kafkaStream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
> {\code}
> Some records which processed in the last batch before Streaming graceful 
> shutdown reprocess in the first batch after Spark Streaming restart, such 
> below
> * output first run of this application
> {code}
> key: null value: 1 offset: 101452472
> key: null value: 2 offset: 101452473
> key: null value: 3 offset: 101452474
> key: null value: 4 offset: 101452475
> key: null value: 5 offset: 101452476
> key: null value: 6 offset: 101452477
> key: null value: 7 offset: 101452478
> key: null value: 8 offset: 101452479
> key: null value: 9 offset: 101452480  // this is a last record before 
> shutdown Spark Streaming gracefully
> {\code}
> * output re-run of this application
> {code}
> key: null value: 7 offset: 101452478   // duplication
> key: null value: 8 offset: 101452479   // duplication
> key: null value: 9 offset: 101452480   // duplication
> key: null value: 10 offset: 101452481
> {\code}
> It may cause offsets specified in commitAsync will commit in the head of next 
> batch.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown

2017-03-23 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated SPARK-20050:

Description: 
I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such below

{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)

kafkaStream.map { input =>
  "key: " + input.key.toString + " value: " + input.value.toString + " offset: 
" + input.offset.toString
  }.foreachRDD { rdd =>
rdd.foreach { input =>
println(input)
  }
}

kafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}

Some records which processed in the last batch before Streaming graceful 
shutdown reprocess in the first batch after Spark Streaming restart, such below

* output first run of this application
{code}
key: null value: 1 offset: 101452472
key: null value: 2 offset: 101452473
key: null value: 3 offset: 101452474
key: null value: 4 offset: 101452475
key: null value: 5 offset: 101452476
key: null value: 6 offset: 101452477
key: null value: 7 offset: 101452478
key: null value: 8 offset: 101452479
key: null value: 9 offset: 101452480  // this is a last record before shutdown 
Spark Streaming gracefully
{\code}

* output re-run of this application
{code}
key: null value: 7 offset: 101452478   // duplication
key: null value: 8 offset: 101452479   // duplication
key: null value: 9 offset: 101452480   // duplication
key: null value: 10 offset: 101452481
{\code}

It may cause offsets specified in commitAsync will commit in the head of next 
batch.


  was:
I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such below

{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)

kafkaStream.map { input =>
  "key: " + input.key.toString + " value: " + input.value.toString + " offset: 
" + input.offset.toString
  }.foreachRDD { rdd =>
rdd.foreach { input =>
println(input)
  }
}

kafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}

Some records which processed in the last batch before Streaming graceful 
shutdown reprocess in the first batch after Spark Streaming restart, such below

* output first run of this application
{code}
key: null value: 1 offset: 101452472
key: null value: 2 offset: 101452473
key: null value: 3 offset: 101452474
key: null value: 4 offset: 101452475
key: null value: 5 offset: 101452476
key: null value: 6 offset: 101452477
key: null value: 7 offset: 101452478
key: null value: 8 offset: 101452479
key: null value: 9 offset: 101452480
{\code}

* output re-run of this application
{code}
key: null value: 7 offset: 101452478   // duplication
key: null value: 8 offset: 101452479   // duplication
key: null value: 9 offset: 101452480   // duplication
key: null value: 10 offset: 101452481
{\code}

It may cause offsets specified in commitAsync will commit in the head of next 
batch.



> Kafka 0.10 DirectStream doesn't commit last processed batch's offset when 
> graceful shutdown
> ---
>
> Key: SPARK-20050
> URL: https://issues.apache.org/jira/browse/SPARK-20050
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>
> I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
> call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such 
> below
> {code}
> val kafkaStream = KafkaUtils.createDirectStream[String, String](...)
> kafkaStream.map { input =>
>   "key: " + input.key.toString + " value: " + input.value.toString + " 
> offset: " + input.offset.toString
>   }.foreachRDD { rdd =>
> rdd.foreach { input =>
> println(input)
>   }
> }
> kafkaStream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
> {\code}
> Some records which processed in the last batch before Streaming graceful 
> shutdown reprocess in the first batch after Spark Streaming restart, such 
> below
> * output first run of this application
> {code}
> key: null value: 1 offset: 101452472
> key: null value: 2 offset: 101452473
> key: null value: 3 offset: 101452474
> key: null value: 4 offset: 101452475
> key: null value: 5 offset: 101452476
> key: null value: 6 offset: 101452477
> key: null value: 7 offset: 101452478
> key: null value: 8 offset: 101452479
> key: null value: 9 offset: 101452480  // 

[jira] [Updated] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown

2017-03-23 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated SPARK-20050:

Description: 
I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such below

{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)

kafkaStream.map { input =>
  "key: " + input.key.toString + " value: " + input.value.toString + " offset: 
" + input.offset.toString
  }.foreachRDD { rdd =>
rdd.foreach { input =>
println(input)
  }
}

kafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}

Some records which processed in the last batch before Streaming graceful 
shutdown reprocess in the first batch after Spark Streaming restart, such below

* output first run of this application
{code}
key: null value: 1 offset: 101452472
key: null value: 2 offset: 101452473
key: null value: 3 offset: 101452474
key: null value: 4 offset: 101452475
key: null value: 5 offset: 101452476
key: null value: 6 offset: 101452477
key: null value: 7 offset: 101452478
key: null value: 8 offset: 101452479
key: null value: 9 offset: 101452480
{\code}

* output re-run of this application
{code}
key: null value: 7 offset: 101452478   // duplication
key: null value: 8 offset: 101452479   // duplication
key: null value: 9 offset: 101452480   // duplication
key: null value: 10 offset: 101452481
{\code}

It may cause offsets specified in commitAsync will commit in the head of next 
batch.


  was:
I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such below

{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)

kafkaStream.map { input =>
  "key: " + input.key.toString + " value: " + input.value.toString + " offset: 
" + input.offset.toString
  }.foreachRDD { rdd =>
rdd.foreach { input =>
println(input)
  }
}

kafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}

Some records which processed in the last batch before Streaming graceful 
shutdown reprocess in the first batch after Spark Streaming restart.

It may cause offsets specified in commitAsync will commit in the head of next 
batch.



> Kafka 0.10 DirectStream doesn't commit last processed batch's offset when 
> graceful shutdown
> ---
>
> Key: SPARK-20050
> URL: https://issues.apache.org/jira/browse/SPARK-20050
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>
> I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
> call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such 
> below
> {code}
> val kafkaStream = KafkaUtils.createDirectStream[String, String](...)
> kafkaStream.map { input =>
>   "key: " + input.key.toString + " value: " + input.value.toString + " 
> offset: " + input.offset.toString
>   }.foreachRDD { rdd =>
> rdd.foreach { input =>
> println(input)
>   }
> }
> kafkaStream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
> {\code}
> Some records which processed in the last batch before Streaming graceful 
> shutdown reprocess in the first batch after Spark Streaming restart, such 
> below
> * output first run of this application
> {code}
> key: null value: 1 offset: 101452472
> key: null value: 2 offset: 101452473
> key: null value: 3 offset: 101452474
> key: null value: 4 offset: 101452475
> key: null value: 5 offset: 101452476
> key: null value: 6 offset: 101452477
> key: null value: 7 offset: 101452478
> key: null value: 8 offset: 101452479
> key: null value: 9 offset: 101452480
> {\code}
> * output re-run of this application
> {code}
> key: null value: 7 offset: 101452478   // duplication
> key: null value: 8 offset: 101452479   // duplication
> key: null value: 9 offset: 101452480   // duplication
> key: null value: 10 offset: 101452481
> {\code}
> It may cause offsets specified in commitAsync will commit in the head of next 
> batch.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown

2017-03-23 Thread Sasaki Toru (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938988#comment-15938988
 ] 

Sasaki Toru edited comment on SPARK-20050 at 3/23/17 6:45 PM:
--

Thank you for your comment, but I can't understand your advice, sorry.

I want to say some offset set in {{commitAsync}} will not commit to Kafka.
I think callback function will be invoked when committed to Kafka completely 
(success or failed),
so I think this function will not be invoked in this case.

If I am wrong, please correct, thanks.


was (Author: sasakitoa):
Thank you for your comment, but I can't understand your advice, sorry.

I want to say some offset set in {{commitAsync}} will not commit to Kafka.
I think callback function will invoke when committed to Kafka completely 
(success or failed),
so I think this function will not be invoked in this case.

If I am wrong, please correct, thanks.

> Kafka 0.10 DirectStream doesn't commit last processed batch's offset when 
> graceful shutdown
> ---
>
> Key: SPARK-20050
> URL: https://issues.apache.org/jira/browse/SPARK-20050
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>
> I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
> call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such 
> below
> {code}
> val kafkaStream = KafkaUtils.createDirectStream[String, String](...)
> kafkaStream.map { input =>
>   "key: " + input.key.toString + " value: " + input.value.toString + " 
> offset: " + input.offset.toString
>   }.foreachRDD { rdd =>
> rdd.foreach { input =>
> println(input)
>   }
> }
> kafkaStream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
> {\code}
> Some records which processed in the last batch before Streaming graceful 
> shutdown reprocess in the first batch after Spark Streaming restart.
> It may cause offsets specified in commitAsync will commit in the head of next 
> batch.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown

2017-03-23 Thread Sasaki Toru (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938988#comment-15938988
 ] 

Sasaki Toru commented on SPARK-20050:
-

Thank you for your comment, but I can't understand your advice, sorry.

I want to say some offset set in {{commitAsync}} will not commit to Kafka.
I think callback function will invoke when committed to Kafka completely 
(success or failed),
so I think this function will not be invoked in this case.

If I am wrong, please correct, thanks.

> Kafka 0.10 DirectStream doesn't commit last processed batch's offset when 
> graceful shutdown
> ---
>
> Key: SPARK-20050
> URL: https://issues.apache.org/jira/browse/SPARK-20050
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>
> I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
> call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such 
> below
> {code}
> val kafkaStream = KafkaUtils.createDirectStream[String, String](...)
> kafkaStream.map { input =>
>   "key: " + input.key.toString + " value: " + input.value.toString + " 
> offset: " + input.offset.toString
>   }.foreachRDD { rdd =>
> rdd.foreach { input =>
> println(input)
>   }
> }
> kafkaStream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
> {\code}
> Some records which processed in the last batch before Streaming graceful 
> shutdown reprocess in the first batch after Spark Streaming restart.
> It may cause offsets specified in commitAsync will commit in the head of next 
> batch.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20052) Some InputDStream needs closing processing after processing all batches when graceful shutdown

2017-03-21 Thread Sasaki Toru (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935789#comment-15935789
 ] 

Sasaki Toru commented on SPARK-20052:
-

My explain is not good, sorry.

This ticket is related to SPARK-20050.
In JobGenerate#stop, it will wait for finishing all batches after 
InputDStream#stop called when graceful shutdown is enable,
but Kafka 0.10 DirectStream should commit offset after processing all batches.

So I thought more process(I explained this "closing process") is needed after 
processing all batches.


> Some InputDStream needs closing processing after processing all batches when 
> graceful shutdown
> --
>
> Key: SPARK-20052
> URL: https://issues.apache.org/jira/browse/SPARK-20052
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>
> Some class extend InputDStream needs closing processing after processing all 
> batches when graceful shutdown enabled.
> (e.g. When using Kafka as data source, need to commit processed offsets to 
> Kafka Broker)
> InputDStream has method 'stop' to stop receiving data, but this method will 
> be called before processing last batches generated for graceful shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20052) Some InputDStream needs closing processing after processing all batches when graceful shutdown

2017-03-21 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated SPARK-20052:

Summary: Some InputDStream needs closing processing after processing all 
batches when graceful shutdown  (was: Some InputDStream needs closing 
processing after all batches processed when graceful shutdown)

> Some InputDStream needs closing processing after processing all batches when 
> graceful shutdown
> --
>
> Key: SPARK-20052
> URL: https://issues.apache.org/jira/browse/SPARK-20052
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>
> Some class extend InputDStream needs closing processing after processing all 
> batches when graceful shutdown enabled.
> (e.g. When using Kafka as data source, need to commit processed offsets to 
> Kafka Broker)
> InputDStream has method 'stop' to stop receiving data, but this method will 
> be called before processing last batches generated for graceful shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20052) Some InputDStream needs closing processing after all batches processed when graceful shutdown

2017-03-21 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated SPARK-20052:

Description: 
Some class extend InputDStream needs closing processing after processing all 
batches when graceful shutdown enabled.
(e.g. When using Kafka as data source, need to commit processed offsets to 
Kafka Broker)

InputDStream has method 'stop' to stop receiving data, but this method will be 
called before processing last batches generated for graceful shutdown.


  was:
Some class extend InputDStream needs closing processing after all batches 
processed when graceful shutdown enabled.
(e.g. When using Kafka as data source, need to commit processed offsets to 
Kafka Broker)

InputDStream has method 'stop' to stop receiving data, but this method will be 
called before processing last batches generated for graceful shutdown.



> Some InputDStream needs closing processing after all batches processed when 
> graceful shutdown
> -
>
> Key: SPARK-20052
> URL: https://issues.apache.org/jira/browse/SPARK-20052
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>
> Some class extend InputDStream needs closing processing after processing all 
> batches when graceful shutdown enabled.
> (e.g. When using Kafka as data source, need to commit processed offsets to 
> Kafka Broker)
> InputDStream has method 'stop' to stop receiving data, but this method will 
> be called before processing last batches generated for graceful shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20052) Some InputDStream needs closing processing after all batches processed when graceful shutdown

2017-03-21 Thread Sasaki Toru (JIRA)
Sasaki Toru created SPARK-20052:
---

 Summary: Some InputDStream needs closing processing after all 
batches processed when graceful shutdown
 Key: SPARK-20052
 URL: https://issues.apache.org/jira/browse/SPARK-20052
 Project: Spark
  Issue Type: Improvement
  Components: DStreams
Affects Versions: 2.2.0
Reporter: Sasaki Toru


Some class extend InputDStream needs closing processing after all batches 
processed when graceful shutdown enabled.
(e.g. When using Kafka as data source, need to commit processed offsets to 
Kafka Broker)

InputDStream has method 'stop' to stop receiving data, but this method will be 
called before processing last batches generated for graceful shutdown.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown

2017-03-21 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated SPARK-20050:

Description: 
I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such below

{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)

kafkaStream.map { input =>
  "key: " + input.key.toString + " value: " + input.value.toString + " offset: 
" + input.offset.toString
  }.foreachRDD { rdd =>
rdd.foreach { input =>
println(input)
  }
}

kafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}

Some records which processed in the last batch before Streaming graceful 
shutdown reprocess in the first batch after Spark Streaming restart.

It may cause offsets specified in commitAsync will commit in the head of next 
batch.


  was:
I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
call 'DirectKafkaInputDStream#commitAsync' finally in each batches such below

{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)

kafkaStream.map { input =>
  "key: " + input.key.toString + " value: " + input.value.toString + " offset: 
" + input.offset.toString
  }.foreachRDD { rdd =>
rdd.foreach { input =>
println(input)
  }
}

kafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}

Some records which processed in the last batch before Streaming graceful 
shutdown reprocess in the first batch after Spark Streaming restart.

It may cause offsets specified in commitAsync will commit in the head of next 
batch.


 Issue Type: Bug  (was: Improvement)

> Kafka 0.10 DirectStream doesn't commit last processed batch's offset when 
> graceful shutdown
> ---
>
> Key: SPARK-20050
> URL: https://issues.apache.org/jira/browse/SPARK-20050
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Sasaki Toru
>
> I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
> call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such 
> below
> {code}
> val kafkaStream = KafkaUtils.createDirectStream[String, String](...)
> kafkaStream.map { input =>
>   "key: " + input.key.toString + " value: " + input.value.toString + " 
> offset: " + input.offset.toString
>   }.foreachRDD { rdd =>
> rdd.foreach { input =>
> println(input)
>   }
> }
> kafkaStream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
> {\code}
> Some records which processed in the last batch before Streaming graceful 
> shutdown reprocess in the first batch after Spark Streaming restart.
> It may cause offsets specified in commitAsync will commit in the head of next 
> batch.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown

2017-03-21 Thread Sasaki Toru (JIRA)
Sasaki Toru created SPARK-20050:
---

 Summary: Kafka 0.10 DirectStream doesn't commit last processed 
batch's offset when graceful shutdown
 Key: SPARK-20050
 URL: https://issues.apache.org/jira/browse/SPARK-20050
 Project: Spark
  Issue Type: Improvement
  Components: DStreams
Affects Versions: 2.2.0
Reporter: Sasaki Toru


I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
call 'DirectKafkaInputDStream#commitAsync' finally in each batches such below

{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)

kafkaStream.map { input =>
  "key: " + input.key.toString + " value: " + input.value.toString + " offset: 
" + input.offset.toString
  }.foreachRDD { rdd =>
rdd.foreach { input =>
println(input)
  }
}

kafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}

Some records which processed in the last batch before Streaming graceful 
shutdown reprocess in the first batch after Spark Streaming restart.

It may cause offsets specified in commitAsync will commit in the head of next 
batch.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13264) Remove multi-byte character in spark-env.sh.template

2016-02-09 Thread Sasaki Toru (JIRA)
Sasaki Toru created SPARK-13264:
---

 Summary: Remove multi-byte character in spark-env.sh.template
 Key: SPARK-13264
 URL: https://issues.apache.org/jira/browse/SPARK-13264
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.0.0
Reporter: Sasaki Toru
Priority: Minor
 Fix For: 2.0.0


In spark-env.sh.template, there are multi-byte characters.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6736) Example of Graph#aggregateMessages has error

2015-04-07 Thread Sasaki Toru (JIRA)
Sasaki Toru created SPARK-6736:
--

 Summary: Example of Graph#aggregateMessages has error
 Key: SPARK-6736
 URL: https://issues.apache.org/jira/browse/SPARK-6736
 Project: Spark
  Issue Type: Improvement
  Components: GraphX
Affects Versions: 1.3.0
Reporter: Sasaki Toru
Priority: Minor
 Fix For: 1.4.0


Example of Graph#aggregateMessages has error.
Since aggregateMessages is a method of Graph, It should be written 
rawGraph.aggregateMessages



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6736) [GraphX]Example of Graph#aggregateMessages has error

2015-04-07 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated SPARK-6736:
---
Summary: [GraphX]Example of Graph#aggregateMessages has error  (was: 
Example of Graph#aggregateMessages has error)

 [GraphX]Example of Graph#aggregateMessages has error
 

 Key: SPARK-6736
 URL: https://issues.apache.org/jira/browse/SPARK-6736
 Project: Spark
  Issue Type: Improvement
  Components: GraphX
Affects Versions: 1.3.0
Reporter: Sasaki Toru
Priority: Minor
 Fix For: 1.4.0


 Example of Graph#aggregateMessages has error.
 Since aggregateMessages is a method of Graph, It should be written 
 rawGraph.aggregateMessages



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6736) [GraphX]Example of Graph#aggregateMessages has error

2015-04-07 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated SPARK-6736:
---
Component/s: Documentation

 [GraphX]Example of Graph#aggregateMessages has error
 

 Key: SPARK-6736
 URL: https://issues.apache.org/jira/browse/SPARK-6736
 Project: Spark
  Issue Type: Improvement
  Components: Documentation, GraphX
Affects Versions: 1.3.0
Reporter: Sasaki Toru
Priority: Minor
 Fix For: 1.4.0


 Example of Graph#aggregateMessages has error.
 Since aggregateMessages is a method of Graph, It should be written 
 rawGraph.aggregateMessages



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4742) The name of Parquet File generated by AppendingParquetOutputFormat should be zero padded

2014-12-04 Thread Sasaki Toru (JIRA)
Sasaki Toru created SPARK-4742:
--

 Summary: The name of Parquet File generated by 
AppendingParquetOutputFormat should be zero padded
 Key: SPARK-4742
 URL: https://issues.apache.org/jira/browse/SPARK-4742
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 1.2.0
Reporter: Sasaki Toru
Priority: Minor


When I use Parquet File as a output file using 
ParquetOutputFormat#getDefaultWorkFile, the file name is not zero padded while 
RDD#saveAsText does zero padding.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org