Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
In either case, end to end exactly once guarantee can only be ensured only
if the output sink is updated transactionally. The engine has to re execute
data on failure. Exactly once guarantee means that the external storage is
updated as if each data record was computed exactly once. That's why you
need to update them transactionally to handle possible recomputations.

This is true for both spark streaming and structured streaming. Hope this
helps.

On Jun 6, 2017 5:56 AM, "ALunar Beach" <alunarbe...@gmail.com> wrote:

> Thanks TD.
> In pre-structured streaming, exactly once guarantee on input is not
> guaranteed. is it?
>
> On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com
> > wrote:
>
>> This is the expected behavior. There are some confusing corner cases.
>> If you are starting to play with Spark Streaming, i highly recommend
>> learning Structured Streaming
>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
>> instead.
>>
>> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com>
>> wrote:
>>
>>> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
>>> It uses a 30 sec batch duration and normally the job is successful in
>>> 15-20 sec.
>>>
>>> If the spark application fails after the successful completion
>>> (149668428ms in the log below) and restarts, it's duplicating the last
>>> batch again.
>>>
>>> Is this the expected behavior? I was expecting this to start a new batch
>>> window.
>>>
>>>
>>> Here are some logs:
>>>
>>> Last successful run:
>>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
>>> 149668428 ms (execution: 0.029 s)
>>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
>>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
>>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
>>> 149668428 ms to writer queue
>>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
>>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
>>> aProjects/Spark2Example/ckpt/checkpoint-149668428'
>>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
>>> 149668428 ms saved to file
>>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
>>> took 4032 bytes and 9 ms*
>>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
>>> 149668428 ms
>>>
>>> After the restart,
>>>
>>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
>>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
>>> 149668428 ms [(my_test,0,2000,2000)]
>>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
>>> batches): 149668428 ms, 149668431 ms, 149668434 ms,
>>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>>> 149668449 ms, 149668452 ms, 149668455 ms*
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
>>> batches): *
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
>>> batches): *149668428 ms, 149668431 ms, 149668434 ms,
>>> 1496684370000 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>>> 149668449 ms, 149668452 ms, 149668455 ms
>>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
>>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
>>> 149668428 ms.0 from job set of time 149668428 ms
>>>
>>>
>>>
>>> --
>>> View this message in context: Fwd: Spark Streaming Checkpoint and
>>> Exactly Once Guarantee on Kafka Direct Stream
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>>
>


Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread ALunar Beach
Thanks TD.
In pre-structured streaming, exactly once guarantee on input is not
guaranteed. is it?

On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> This is the expected behavior. There are some confusing corner cases.
> If you are starting to play with Spark Streaming, i highly recommend
> learning Structured Streaming
> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
> instead.
>
> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com>
> wrote:
>
>> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
>> It uses a 30 sec batch duration and normally the job is successful in
>> 15-20 sec.
>>
>> If the spark application fails after the successful completion
>> (149668428ms in the log below) and restarts, it's duplicating the last
>> batch again.
>>
>> Is this the expected behavior? I was expecting this to start a new batch
>> window.
>>
>>
>> Here are some logs:
>>
>> Last successful run:
>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
>> 149668428 ms (execution: 0.029 s)
>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
>> 149668428 ms to writer queue
>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
>> aProjects/Spark2Example/ckpt/checkpoint-149668428'
>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
>> 149668428 ms saved to file
>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
>> took 4032 bytes and 9 ms*
>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
>> 149668428 ms
>>
>> After the restart,
>>
>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
>> 149668428 ms [(my_test,0,2000,2000)]
>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
>> batches): 149668428 ms, 149668431 ms, 149668434 ms,
>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>> 149668449 ms, 149668452 ms, 149668455 ms*
>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
>> batches): *
>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches):
>> *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms,
>> 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms,
>> 1496684520000 ms, 149668455 ms
>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 1496684280000 ms
>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
>> 149668428 ms.0 from job set of time 149668428 ms
>>
>>
>>
>> --
>> View this message in context: Fwd: Spark Streaming Checkpoint and
>> Exactly Once Guarantee on Kafka Direct Stream
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>
>


Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
This is the expected behavior. There are some confusing corner cases.
If you are starting to play with Spark Streaming, i highly recommend
learning Structured Streaming
<http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
instead.

On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com>
wrote:

> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
> It uses a 30 sec batch duration and normally the job is successful in
> 15-20 sec.
>
> If the spark application fails after the successful completion
> (149668428ms in the log below) and restarts, it's duplicating the last
> batch again.
>
> Is this the expected behavior? I was expecting this to start a new batch
> window.
>
>
> Here are some logs:
>
> Last successful run:
> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
> 149668428 ms (execution: 0.029 s)
> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
> 149668428 ms to writer queue
> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
> aProjects/Spark2Example/ckpt/checkpoint-149668428'
> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
> 149668428 ms saved to file
> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
> took 4032 bytes and 9 ms*
> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
> 149668428 ms
>
> After the restart,
>
> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
> 149668428 ms [(my_test,0,2000,2000)]
> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
> batches): 149668428 ms, 149668431 ms, 149668434 ms,
> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
> 149668449 ms, 149668452 ms, 149668455 ms*
> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
> batches): *
> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): 
> *149668428
> ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
> 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
> 149668455 ms
> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
> 149668428 ms.0 from job set of time 149668428 ms
>
>
>
> ------
> View this message in context: Fwd: Spark Streaming Checkpoint and Exactly
> Once Guarantee on Kafka Direct Stream
> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread anbucheeralan
I am using Spark Streaming Checkpoint and Kafka Direct Stream.
It uses a 30 sec batch duration and normally the job is successful in 15-20
sec.

If the spark application fails after the successful completion
(149668428ms in the log below) and restarts, it's duplicating the last
batch again.

Is this the expected behavior? I was expecting this to start a new batch
window.


Here are some logs:

Last successful run:
17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
149668428 ms (execution: 0.029 s)
17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
149668428 ms to writer queue
17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
149668428 ms to file 'file:/Users/anbucheeralan/
IdeaProjects/Spark2Example/ckpt/checkpoint-149668428'
17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428
ms saved to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
took 4032 bytes and 9 ms*
17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
149668428 ms

After the restart,

17/06/05 13:42:31 INFO DirectKafkaInputDStream$
DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
149668428 ms [(my_test,0,2000,2000)]
17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
*17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
batches): 149668428 ms, 149668431 ms, 149668434 ms,
149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
149668449 ms, 149668452 ms, 149668455 ms*
*17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
batches): *
*17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
batches): *149668428
ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
149668455 ms
17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
149668428 ms.0 from job set of time 149668428 ms




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread ALunar Beach
I am using Spark Streaming Checkpoint and Kafka Direct Stream.
It uses a 30 sec batch duration and normally the job is successful in 15-20
sec.

If the spark application fails after the successful completion
(149668428ms in the log below) and restarts, it's duplicating the last
batch again.

Is this the expected behavior? I was expecting this to start a new batch
window.


Here are some logs:

Last successful run:
17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
149668428 ms (execution: 0.029 s)
17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
149668428 ms to writer queue
17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
149668428 ms to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428'
17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428
ms saved to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
took 4032 bytes and 9 ms*
17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
149668428 ms

After the restart,

17/06/05 13:42:31 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)]
17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
*17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
batches): 149668428 ms, 149668431 ms, 149668434 ms,
149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
149668449 ms, 149668452 ms, 149668455 ms*
*17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
batches): *
*17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
batches): *149668428
ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
149668455 ms
17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
149668428 ms.0 from job set of time 149668428 ms