[Apache Spark][Streaming Job][Checkpoint]Spark job failed on Checkpoint recovery with Batch not found error

2020-05-28 Thread taylorwu
Hi,

We have a Spark 2.4 job failed on Checkpoint recovery every few hours with
the following errors (from the Driver Log):

driver spark-kubernetes-driver ERROR 20:38:51 ERROR MicroBatchExecution:
Query impressionUpdate [id = 54614900-4145-4d60-8156-9746ffc13d1f, runId =
3637c2f3-49b6-40c2-b6d0-7edb28361c5d] terminated with error
java.lang.IllegalStateException: batch 946 doesn't exist
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:406)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

And the executor logs show this error:

 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

How should I fix this?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Jörn Franke

That is the cost of exactly once :) 

> On 22 Jun 2016, at 12:54, sandesh deshmane  wrote:
> 
> We are going with checkpointing . we don't have identifier available to 
> identify if the message is already processed or not .
> Even if we had it, then it will slow down the processing as we do get 300k 
> messages per sec , so lookup will slow down.
> 
> Thanks
> Sandesh
> 
>> On Wed, Jun 22, 2016 at 3:28 PM, Jörn Franke  wrote:
>> 
>> Spark Streamig does not guarantee exactly once for output action. It means 
>> that one item is only processed in an RDD.
>> You can achieve at most once or at least once.
>> You could however do at least once (via checkpoing) and record which 
>> messages have been proceed (some identifier available?) and do not re 
>> process them  You could also store (safely) what range has been already 
>> processed etc
>> 
>> Think about the business case if exactly once is needed or if it can be 
>> replaced by one of the others.
>> Exactly once, it needed requires in any system including spark more effort 
>> and usually the throughput is lower. A risk evaluation from a business point 
>> of view has to be done anyway...
>> 
>> > On 22 Jun 2016, at 09:09, sandesh deshmane  wrote:
>> >
>> > Hi,
>> >
>> > I am writing spark streaming application which reads messages from Kafka.
>> >
>> > I am using checkpointing and write ahead logs ( WAL) to achieve fault 
>> > tolerance .
>> >
>> > I have created batch size of 10 sec for reading messages from kafka.
>> >
>> > I read messages for kakfa and generate the count of messages as per values 
>> > received from Kafka message.
>> >
>> > In case there is failure and my spark streaming application is restarted I 
>> > see duplicate messages processed ( which is close to 2 batches)
>> >
>> > The problem that I have is per sec I get around 300k messages and In case 
>> > application is restarted I see around 3-5 million duplicate counts.
>> >
>> > How to avoid such duplicates?
>> >
>> > what is best to way to recover from such failures ?
>> >
>> > Thanks
>> > Sandesh
> 


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Cody Koeninger
The direct stream doesn't automagically give you exactly-once
semantics.  Indeed, you should be pretty suspicious of anything that
claims to give you end-to-end exactly-once semantics without any
additional work on your part.

To the original poster, have you read / watched the materials linked
from the page below?  That should clarify what your options are.

https://github.com/koeninger/kafka-exactly-once

On Wed, Jun 22, 2016 at 5:55 AM, Denys Cherepanin  wrote:
> Hi Sandesh,
>
> As I understand you are using "receiver based" approach to integrate kafka
> with spark streaming.
>
> Did you tried "direct" approach ? In this case offsets will be tracked by
> streaming app via check-pointing and you should achieve exactly-once
> semantics
>
> On Wed, Jun 22, 2016 at 5:58 AM, Jörn Franke  wrote:
>>
>>
>> Spark Streamig does not guarantee exactly once for output action. It means
>> that one item is only processed in an RDD.
>> You can achieve at most once or at least once.
>> You could however do at least once (via checkpoing) and record which
>> messages have been proceed (some identifier available?) and do not re
>> process them  You could also store (safely) what range has been already
>> processed etc
>>
>> Think about the business case if exactly once is needed or if it can be
>> replaced by one of the others.
>> Exactly once, it needed requires in any system including spark more effort
>> and usually the throughput is lower. A risk evaluation from a business point
>> of view has to be done anyway...
>>
>> > On 22 Jun 2016, at 09:09, sandesh deshmane 
>> > wrote:
>> >
>> > Hi,
>> >
>> > I am writing spark streaming application which reads messages from
>> > Kafka.
>> >
>> > I am using checkpointing and write ahead logs ( WAL) to achieve fault
>> > tolerance .
>> >
>> > I have created batch size of 10 sec for reading messages from kafka.
>> >
>> > I read messages for kakfa and generate the count of messages as per
>> > values received from Kafka message.
>> >
>> > In case there is failure and my spark streaming application is restarted
>> > I see duplicate messages processed ( which is close to 2 batches)
>> >
>> > The problem that I have is per sec I get around 300k messages and In
>> > case application is restarted I see around 3-5 million duplicate counts.
>> >
>> > How to avoid such duplicates?
>> >
>> > what is best to way to recover from such failures ?
>> >
>> > Thanks
>> > Sandesh
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
> --
> Yours faithfully, Denys Cherepanin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread sandesh deshmane
We have not tried direct approach . We are using receiver based approach (
we use zookeepers to connect from spark)

We have around 20+ Kafka and some times we replace the kafka brokers ( they
go down ). So each time I need to change list at spark application and I
need to restart the streaming app.

Thanks
Sandesh

On Wed, Jun 22, 2016 at 4:25 PM, Denys Cherepanin 
wrote:

> Hi Sandesh,
>
> As I understand you are using "receiver based" approach to integrate kafka
> with spark streaming.
>
> Did you tried "direct" approach
> 
>  ?
> In this case offsets will be tracked by streaming app via check-pointing
> and you should achieve exactly-once semantics
>
> On Wed, Jun 22, 2016 at 5:58 AM, Jörn Franke  wrote:
>
>>
>> Spark Streamig does not guarantee exactly once for output action. It
>> means that one item is only processed in an RDD.
>> You can achieve at most once or at least once.
>> You could however do at least once (via checkpoing) and record which
>> messages have been proceed (some identifier available?) and do not re
>> process them  You could also store (safely) what range has been already
>> processed etc
>>
>> Think about the business case if exactly once is needed or if it can be
>> replaced by one of the others.
>> Exactly once, it needed requires in any system including spark more
>> effort and usually the throughput is lower. A risk evaluation from a
>> business point of view has to be done anyway...
>>
>> > On 22 Jun 2016, at 09:09, sandesh deshmane 
>> wrote:
>> >
>> > Hi,
>> >
>> > I am writing spark streaming application which reads messages from
>> Kafka.
>> >
>> > I am using checkpointing and write ahead logs ( WAL) to achieve fault
>> tolerance .
>> >
>> > I have created batch size of 10 sec for reading messages from kafka.
>> >
>> > I read messages for kakfa and generate the count of messages as per
>> values received from Kafka message.
>> >
>> > In case there is failure and my spark streaming application is
>> restarted I see duplicate messages processed ( which is close to 2 batches)
>> >
>> > The problem that I have is per sec I get around 300k messages and In
>> case application is restarted I see around 3-5 million duplicate counts.
>> >
>> > How to avoid such duplicates?
>> >
>> > what is best to way to recover from such failures ?
>> >
>> > Thanks
>> > Sandesh
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Yours faithfully, Denys Cherepanin
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Denys Cherepanin
Hi Sandesh,

As I understand you are using "receiver based" approach to integrate kafka
with spark streaming.

Did you tried "direct" approach

?
In this case offsets will be tracked by streaming app via check-pointing
and you should achieve exactly-once semantics

On Wed, Jun 22, 2016 at 5:58 AM, Jörn Franke  wrote:

>
> Spark Streamig does not guarantee exactly once for output action. It means
> that one item is only processed in an RDD.
> You can achieve at most once or at least once.
> You could however do at least once (via checkpoing) and record which
> messages have been proceed (some identifier available?) and do not re
> process them  You could also store (safely) what range has been already
> processed etc
>
> Think about the business case if exactly once is needed or if it can be
> replaced by one of the others.
> Exactly once, it needed requires in any system including spark more effort
> and usually the throughput is lower. A risk evaluation from a business
> point of view has to be done anyway...
>
> > On 22 Jun 2016, at 09:09, sandesh deshmane 
> wrote:
> >
> > Hi,
> >
> > I am writing spark streaming application which reads messages from Kafka.
> >
> > I am using checkpointing and write ahead logs ( WAL) to achieve fault
> tolerance .
> >
> > I have created batch size of 10 sec for reading messages from kafka.
> >
> > I read messages for kakfa and generate the count of messages as per
> values received from Kafka message.
> >
> > In case there is failure and my spark streaming application is restarted
> I see duplicate messages processed ( which is close to 2 batches)
> >
> > The problem that I have is per sec I get around 300k messages and In
> case application is restarted I see around 3-5 million duplicate counts.
> >
> > How to avoid such duplicates?
> >
> > what is best to way to recover from such failures ?
> >
> > Thanks
> > Sandesh
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Yours faithfully, Denys Cherepanin


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread sandesh deshmane
We are going with checkpointing . we don't have identifier available to
identify if the message is already processed or not .
Even if we had it, then it will slow down the processing as we do get 300k
messages per sec , so lookup will slow down.

Thanks
Sandesh

On Wed, Jun 22, 2016 at 3:28 PM, Jörn Franke  wrote:

>
> Spark Streamig does not guarantee exactly once for output action. It means
> that one item is only processed in an RDD.
> You can achieve at most once or at least once.
> You could however do at least once (via checkpoing) and record which
> messages have been proceed (some identifier available?) and do not re
> process them  You could also store (safely) what range has been already
> processed etc
>
> Think about the business case if exactly once is needed or if it can be
> replaced by one of the others.
> Exactly once, it needed requires in any system including spark more effort
> and usually the throughput is lower. A risk evaluation from a business
> point of view has to be done anyway...
>
> > On 22 Jun 2016, at 09:09, sandesh deshmane 
> wrote:
> >
> > Hi,
> >
> > I am writing spark streaming application which reads messages from Kafka.
> >
> > I am using checkpointing and write ahead logs ( WAL) to achieve fault
> tolerance .
> >
> > I have created batch size of 10 sec for reading messages from kafka.
> >
> > I read messages for kakfa and generate the count of messages as per
> values received from Kafka message.
> >
> > In case there is failure and my spark streaming application is restarted
> I see duplicate messages processed ( which is close to 2 batches)
> >
> > The problem that I have is per sec I get around 300k messages and In
> case application is restarted I see around 3-5 million duplicate counts.
> >
> > How to avoid such duplicates?
> >
> > what is best to way to recover from such failures ?
> >
> > Thanks
> > Sandesh
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Jörn Franke

Spark Streamig does not guarantee exactly once for output action. It means that 
one item is only processed in an RDD.
You can achieve at most once or at least once.
You could however do at least once (via checkpoing) and record which messages 
have been proceed (some identifier available?) and do not re process them  
You could also store (safely) what range has been already processed etc

Think about the business case if exactly once is needed or if it can be 
replaced by one of the others.
Exactly once, it needed requires in any system including spark more effort and 
usually the throughput is lower. A risk evaluation from a business point of 
view has to be done anyway...

> On 22 Jun 2016, at 09:09, sandesh deshmane  wrote:
> 
> Hi,
> 
> I am writing spark streaming application which reads messages from Kafka.
> 
> I am using checkpointing and write ahead logs ( WAL) to achieve fault 
> tolerance .
> 
> I have created batch size of 10 sec for reading messages from kafka.
> 
> I read messages for kakfa and generate the count of messages as per values 
> received from Kafka message.
> 
> In case there is failure and my spark streaming application is restarted I 
> see duplicate messages processed ( which is close to 2 batches)
> 
> The problem that I have is per sec I get around 300k messages and In case 
> application is restarted I see around 3-5 million duplicate counts.
> 
> How to avoid such duplicates?
> 
> what is best to way to recover from such failures ?
> 
> Thanks
> Sandesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Mich Talebzadeh
Hi Sandesh,

Where these messages end up? Are they written to a sink (file, database etc)

What is the reason your app fails. Can that be remedied to reduce the
impact.

How do you identify that duplicates are sent and processed?

Cheers,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 10:38, sandesh deshmane  wrote:

> Mich Talebzadeh thanks for reply.
>
> we have retention policy of 4 hours for kafka messages and we have
> multiple other consumers which reads from kafka cluster. ( spark is one of
> them)
>
> we have timestamp in message, but we actually have multiple message with
> same time stamp. its very hard to differentiate.
>
> But we have some offset with kafka and consumer keeps tracks for offset
> and Consumer should read from offset.
>
> so its problem with kafka , not with Spark?
>
> Any way to rectify this?
> we don't have id in messages. if we keep id also , then we keep map where
> we will put the ids and mark them processed, but at run time i need to do
> that lookup and for us , the number of messages is very high, so look up
> will ad up in processing time ?
>
> Thanks
> Sandesh Deshmane
>
> On Wed, Jun 22, 2016 at 2:36 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Yes this is more of Kafka issue as Kafka send the messages again.
>>
>> In your topic do messages come with an ID or timestamp where you can
>> reject them if they have already been processed.
>>
>> In other words do you have a way what message was last processed via
>> Spark before failing.
>>
>> You can of course  reset Kafka retention time and purge it
>>
>>
>> ${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter
>> --topic newtopic --config retention.ms=1000
>>
>>
>>
>> Wait for a minute and then reset back
>>
>>
>>
>> ${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter
>> --topic newtopic --config retention.ms=60
>>
>>
>>
>> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
>> --from-beginning --topic newtopic
>>
>>
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 22 June 2016 at 09:57, sandesh deshmane 
>> wrote:
>>
>>> Here I refer to failure in spark app.
>>>
>>> So When I restart , i see duplicate messages.
>>>
>>> To replicate the scenario , i just do kill mysparkapp and then restart .
>>>
>>> On Wed, Jun 22, 2016 at 1:10 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 As I see it you are using Spark streaming to read data from source
 through Kafka. Your batch interval is 10 sec, so in that interval you have
 10*300K = 3Milion messages

 When you say there is failure are you referring to the failure in the
 source or in Spark streaming app?

 HTH

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 22 June 2016 at 08:09, sandesh deshmane 
 wrote:

> Hi,
>
> I am writing spark streaming application which reads messages from
> Kafka.
>
> I am using checkpointing and write ahead logs ( WAL) to achieve fault
> tolerance .
>
> I have created batch size of 10 sec for reading messages from kafka.
>
> I read messages for kakfa and generate the count of messages as per
> values received from Kafka message.
>
> In case there is failure and my spark streaming application is
> restarted I see duplicate messages processed ( which is close to 2 
> batches)
>
> The problem that I have is per sec I get around 300k messages and In
> case application is restarted I see around 3-5 million duplicate counts.
>
> How to avoid such duplicates?
>
> what is best to way to recover from such failures ?
>
> Thanks
> Sandesh
>


>>>
>>
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread sandesh deshmane
Mich Talebzadeh thanks for reply.

we have retention policy of 4 hours for kafka messages and we have multiple
other consumers which reads from kafka cluster. ( spark is one of them)

we have timestamp in message, but we actually have multiple message with
same time stamp. its very hard to differentiate.

But we have some offset with kafka and consumer keeps tracks for offset and
Consumer should read from offset.

so its problem with kafka , not with Spark?

Any way to rectify this?
we don't have id in messages. if we keep id also , then we keep map where
we will put the ids and mark them processed, but at run time i need to do
that lookup and for us , the number of messages is very high, so look up
will ad up in processing time ?

Thanks
Sandesh Deshmane

On Wed, Jun 22, 2016 at 2:36 PM, Mich Talebzadeh 
wrote:

> Yes this is more of Kafka issue as Kafka send the messages again.
>
> In your topic do messages come with an ID or timestamp where you can
> reject them if they have already been processed.
>
> In other words do you have a way what message was last processed via Spark
> before failing.
>
> You can of course  reset Kafka retention time and purge it
>
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter --topic
> newtopic --config retention.ms=1000
>
>
>
> Wait for a minute and then reset back
>
>
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter --topic
> newtopic --config retention.ms=60
>
>
>
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
> --from-beginning --topic newtopic
>
>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 June 2016 at 09:57, sandesh deshmane  wrote:
>
>> Here I refer to failure in spark app.
>>
>> So When I restart , i see duplicate messages.
>>
>> To replicate the scenario , i just do kill mysparkapp and then restart .
>>
>> On Wed, Jun 22, 2016 at 1:10 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> As I see it you are using Spark streaming to read data from source
>>> through Kafka. Your batch interval is 10 sec, so in that interval you have
>>> 10*300K = 3Milion messages
>>>
>>> When you say there is failure are you referring to the failure in the
>>> source or in Spark streaming app?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 22 June 2016 at 08:09, sandesh deshmane 
>>> wrote:
>>>
 Hi,

 I am writing spark streaming application which reads messages from
 Kafka.

 I am using checkpointing and write ahead logs ( WAL) to achieve fault
 tolerance .

 I have created batch size of 10 sec for reading messages from kafka.

 I read messages for kakfa and generate the count of messages as per
 values received from Kafka message.

 In case there is failure and my spark streaming application is
 restarted I see duplicate messages processed ( which is close to 2 batches)

 The problem that I have is per sec I get around 300k messages and In
 case application is restarted I see around 3-5 million duplicate counts.

 How to avoid such duplicates?

 what is best to way to recover from such failures ?

 Thanks
 Sandesh

>>>
>>>
>>
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Mich Talebzadeh
Yes this is more of Kafka issue as Kafka send the messages again.

In your topic do messages come with an ID or timestamp where you can reject
them if they have already been processed.

In other words do you have a way what message was last processed via Spark
before failing.

You can of course  reset Kafka retention time and purge it


${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter --topic
newtopic --config retention.ms=1000



Wait for a minute and then reset back



${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper rhes564:2181 --alter --topic
newtopic --config retention.ms=60



${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
--from-beginning --topic newtopic



HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 09:57, sandesh deshmane  wrote:

> Here I refer to failure in spark app.
>
> So When I restart , i see duplicate messages.
>
> To replicate the scenario , i just do kill mysparkapp and then restart .
>
> On Wed, Jun 22, 2016 at 1:10 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> As I see it you are using Spark streaming to read data from source
>> through Kafka. Your batch interval is 10 sec, so in that interval you have
>> 10*300K = 3Milion messages
>>
>> When you say there is failure are you referring to the failure in the
>> source or in Spark streaming app?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 22 June 2016 at 08:09, sandesh deshmane 
>> wrote:
>>
>>> Hi,
>>>
>>> I am writing spark streaming application which reads messages from Kafka.
>>>
>>> I am using checkpointing and write ahead logs ( WAL) to achieve fault
>>> tolerance .
>>>
>>> I have created batch size of 10 sec for reading messages from kafka.
>>>
>>> I read messages for kakfa and generate the count of messages as per
>>> values received from Kafka message.
>>>
>>> In case there is failure and my spark streaming application is restarted
>>> I see duplicate messages processed ( which is close to 2 batches)
>>>
>>> The problem that I have is per sec I get around 300k messages and In
>>> case application is restarted I see around 3-5 million duplicate counts.
>>>
>>> How to avoid such duplicates?
>>>
>>> what is best to way to recover from such failures ?
>>>
>>> Thanks
>>> Sandesh
>>>
>>
>>
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread sandesh deshmane
Here I refer to failure in spark app.

So When I restart , i see duplicate messages.

To replicate the scenario , i just do kill mysparkapp and then restart .

On Wed, Jun 22, 2016 at 1:10 PM, Mich Talebzadeh 
wrote:

> As I see it you are using Spark streaming to read data from source through
> Kafka. Your batch interval is 10 sec, so in that interval you have 10*300K
> = 3Milion messages
>
> When you say there is failure are you referring to the failure in the
> source or in Spark streaming app?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 June 2016 at 08:09, sandesh deshmane  wrote:
>
>> Hi,
>>
>> I am writing spark streaming application which reads messages from Kafka.
>>
>> I am using checkpointing and write ahead logs ( WAL) to achieve fault
>> tolerance .
>>
>> I have created batch size of 10 sec for reading messages from kafka.
>>
>> I read messages for kakfa and generate the count of messages as per
>> values received from Kafka message.
>>
>> In case there is failure and my spark streaming application is restarted
>> I see duplicate messages processed ( which is close to 2 batches)
>>
>> The problem that I have is per sec I get around 300k messages and In case
>> application is restarted I see around 3-5 million duplicate counts.
>>
>> How to avoid such duplicates?
>>
>> what is best to way to recover from such failures ?
>>
>> Thanks
>> Sandesh
>>
>
>


Re: how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread Mich Talebzadeh
As I see it you are using Spark streaming to read data from source through
Kafka. Your batch interval is 10 sec, so in that interval you have 10*300K
= 3Milion messages

When you say there is failure are you referring to the failure in the
source or in Spark streaming app?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 June 2016 at 08:09, sandesh deshmane  wrote:

> Hi,
>
> I am writing spark streaming application which reads messages from Kafka.
>
> I am using checkpointing and write ahead logs ( WAL) to achieve fault
> tolerance .
>
> I have created batch size of 10 sec for reading messages from kafka.
>
> I read messages for kakfa and generate the count of messages as per values
> received from Kafka message.
>
> In case there is failure and my spark streaming application is restarted I
> see duplicate messages processed ( which is close to 2 batches)
>
> The problem that I have is per sec I get around 300k messages and In case
> application is restarted I see around 3-5 million duplicate counts.
>
> How to avoid such duplicates?
>
> what is best to way to recover from such failures ?
>
> Thanks
> Sandesh
>


how to avoid duplicate messages with spark streaming using checkpoint after restart in case of failure

2016-06-22 Thread sandesh deshmane
Hi,

I am writing spark streaming application which reads messages from Kafka.

I am using checkpointing and write ahead logs ( WAL) to achieve fault
tolerance .

I have created batch size of 10 sec for reading messages from kafka.

I read messages for kakfa and generate the count of messages as per values
received from Kafka message.

In case there is failure and my spark streaming application is restarted I
see duplicate messages processed ( which is close to 2 batches)

The problem that I have is per sec I get around 300k messages and In case
application is restarted I see around 3-5 million duplicate counts.

How to avoid such duplicates?

what is best to way to recover from such failures ?

Thanks
Sandesh


Re: Spark Streaming data checkpoint performance

2015-11-07 Thread trung kien
Hmm,

Seems it just do a trick.
Using this method, it's very hard to recovery from failure, since we don't
know which batch have been done.

I really want to maintain the whole running stats in memory to archive full
failure-tolerant.

I just wonder if the performance of data checkpoint is that bad? or I
misses something in my setup?

30 seconds for data checkpoint of 1M keys is too much for me.


On Sat, Nov 7, 2015 at 1:25 PM, Aniket Bhatnagar <aniket.bhatna...@gmail.com
> wrote:

> It depends on the stats you are collecting. For example, if you just
> collecting counts, you can do away with updateStateByKey completely by
> doing insert or update operation on the data store after reduce. I.e.
>
> For each (key, batchCount)
>   if (key exists in dataStore)
> update count = count + batchCount for the key
>  else
> insert (key, batchCount)
>
> Thanks,
> Aniket
>
> On Sat, Nov 7, 2015 at 11:38 AM Thúy Hằng Lê <thuyhang...@gmail.com>
> wrote:
>
>> Thanks Aniket,
>>
>> I want to store the state to an external storage but it should be in
>> later step I think.
>> Basically, I have to use updateStateByKey function to maintain the
>> running state (which requires checkpoint), and my bottleneck is now in data
>> checkpoint.
>>
>> My pseudo code is like below:
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(
>> sparkConf,Durations.seconds(2));
>> jssc.checkpoint("spark-data/checkpoint");
>> JavaPairInputDStream<String, String> messages =
>> KafkaUtils.createDirectStream(...);
>> JavaPairDStream<String, List> stats =
>> messages.mapToPair(parseJson)
>> .reduceByKey(REDUCE_STATS)
>> .updateStateByKey(RUNNING_STATS);
>>
>>JavaPairDStream<String, List> newData =
>> stages.filter(NEW_STATS);
>>
>>newData.foreachRDD{
>>  rdd.forEachPartition{
>>//Store to external storage.
>>  }
>>   }
>>
>>   Without using updateStageByKey, I'm only have the stats of the last
>> micro-batch.
>>
>> Any advise on this?
>>
>>
>> 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>:
>>
>>> Can you try storing the state (word count) in an external key value
>>> store?
>>>
>>> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Anyone could help me on this. It's a bit urgent for me on this.
>>>> I'm very confused and curious about Spark data checkpoint performance?
>>>> Is there any detail implementation of checkpoint I can look into?
>>>> Spark Streaming only take sub-second to process 20K messages/sec,
>>>> however it take 25 seconds for checkpoint. Now my application have average
>>>> 30 seconds latency and keep increasingly.
>>>>
>>>>
>>>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>>>>
>>>>> Thankd all, it would be great to have this feature soon.
>>>>> Do you know what's the release plan for 1.6?
>>>>>
>>>>> In addition to this, I still have checkpoint performance problem
>>>>>
>>>>> My code is just simple like this:
>>>>> JavaStreamingContext jssc = new
>>>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>>>> jssc.checkpoint("spark-data/checkpoint");
>>>>> JavaPairInputDStream<String, String> messages =
>>>>> KafkaUtils.createDirectStream(...);
>>>>> JavaPairDStream<String, List> stats =
>>>>> messages.mapToPair(parseJson)
>>>>> .reduceByKey(REDUCE_STATS)
>>>>> .updateStateByKey(RUNNING_STATS);
>>>>>
>>>>> stats.print()
>>>>>
>>>>>   Now I need to maintain about 800k keys, the stats here is only count
>>>>> number of occurence for key.
>>>>>   While running the cache dir is very small (about 50M), my question
>>>>> is:
>>>>>
>>>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>>>> 10 seconds when data checkpoint is running
>>>>>   It took me 5 seconds to finish the same size micro-batch, why it's
>>>>> too high? what's kind of job in checkpoint?
>>>>>   why it's keep inc

Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Thúy Hằng Lê
Hi all,

Anyone could help me on this. It's a bit urgent for me on this.
I'm very confused and curious about Spark data checkpoint performance? Is
there any detail implementation of checkpoint I can look into?
Spark Streaming only take sub-second to process 20K messages/sec, however
it take 25 seconds for checkpoint. Now my application have average 30
seconds latency and keep increasingly.


2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:

> Thankd all, it would be great to have this feature soon.
> Do you know what's the release plan for 1.6?
>
> In addition to this, I still have checkpoint performance problem
>
> My code is just simple like this:
> JavaStreamingContext jssc = new
> JavaStreamingContext(sparkConf,Durations.seconds(2));
> jssc.checkpoint("spark-data/checkpoint");
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(...);
> JavaPairDStream<String, List> stats =
> messages.mapToPair(parseJson)
> .reduceByKey(REDUCE_STATS)
> .updateStateByKey(RUNNING_STATS);
>
> stats.print()
>
>   Now I need to maintain about 800k keys, the stats here is only count
> number of occurence for key.
>   While running the cache dir is very small (about 50M), my question is:
>
>   1/ For regular micro-batch it takes about 800ms to finish, but every 10
> seconds when data checkpoint is running
>   It took me 5 seconds to finish the same size micro-batch, why it's too
> high? what's kind of job in checkpoint?
>   why it's keep increasing?
>
>   2/ When I changes the data checkpoint interval like using:
>   stats.checkpoint(Durations.seconds(100)); //change to 100, defaults
> is 10
>
>   The checkpoint is keep increasing significantly first checkpoint is 10s,
> second is 30s, third is 70s ... and keep increasing :)
>   Why it's too high when increasing checkpoint interval?
>
> It seems that default interval works more stable.
>
> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>
>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>
>> Left some comments on the JIRA and design doc.
>>
>> -adrian
>>
>> From: Shixiong Zhu
>> Date: Tuesday, November 3, 2015 at 3:32 AM
>> To: Thúy Hằng Lê
>> Cc: Adrian Tanase, "user@spark.apache.org"
>> Subject: Re: Spark Streaming data checkpoint performance
>>
>> "trackStateByKey" is about to be added in 1.6 to resolve the performance
>> issue of "updateStateByKey". You can take a look at
>> https://issues.apache.org/jira/browse/SPARK-2629 and
>> https://github.com/apache/spark/pull/9256
>>
>


Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Aniket Bhatnagar
Can you try storing the state (word count) in an external key value store?

On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:

> Hi all,
>
> Anyone could help me on this. It's a bit urgent for me on this.
> I'm very confused and curious about Spark data checkpoint performance? Is
> there any detail implementation of checkpoint I can look into?
> Spark Streaming only take sub-second to process 20K messages/sec, however
> it take 25 seconds for checkpoint. Now my application have average 30
> seconds latency and keep increasingly.
>
>
> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>
>> Thankd all, it would be great to have this feature soon.
>> Do you know what's the release plan for 1.6?
>>
>> In addition to this, I still have checkpoint performance problem
>>
>> My code is just simple like this:
>> JavaStreamingContext jssc = new
>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>> jssc.checkpoint("spark-data/checkpoint");
>> JavaPairInputDStream<String, String> messages =
>> KafkaUtils.createDirectStream(...);
>> JavaPairDStream<String, List> stats =
>> messages.mapToPair(parseJson)
>> .reduceByKey(REDUCE_STATS)
>> .updateStateByKey(RUNNING_STATS);
>>
>> stats.print()
>>
>>   Now I need to maintain about 800k keys, the stats here is only count
>> number of occurence for key.
>>   While running the cache dir is very small (about 50M), my question is:
>>
>>   1/ For regular micro-batch it takes about 800ms to finish, but every 10
>> seconds when data checkpoint is running
>>   It took me 5 seconds to finish the same size micro-batch, why it's too
>> high? what's kind of job in checkpoint?
>>   why it's keep increasing?
>>
>>   2/ When I changes the data checkpoint interval like using:
>>   stats.checkpoint(Durations.seconds(100)); //change to 100, defaults
>> is 10
>>
>>   The checkpoint is keep increasing significantly first checkpoint is
>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>   Why it's too high when increasing checkpoint interval?
>>
>> It seems that default interval works more stable.
>>
>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>>
>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>
>>> Left some comments on the JIRA and design doc.
>>>
>>> -adrian
>>>
>>> From: Shixiong Zhu
>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>> To: Thúy Hằng Lê
>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>> Subject: Re: Spark Streaming data checkpoint performance
>>>
>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>> performance issue of "updateStateByKey". You can take a look at
>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>> https://github.com/apache/spark/pull/9256
>>>
>>
>


Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Aniket Bhatnagar
It depends on the stats you are collecting. For example, if you just
collecting counts, you can do away with updateStateByKey completely by
doing insert or update operation on the data store after reduce. I.e.

For each (key, batchCount)
  if (key exists in dataStore)
update count = count + batchCount for the key
 else
insert (key, batchCount)

Thanks,
Aniket

On Sat, Nov 7, 2015 at 11:38 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:

> Thanks Aniket,
>
> I want to store the state to an external storage but it should be in later
> step I think.
> Basically, I have to use updateStateByKey function to maintain the
> running state (which requires checkpoint), and my bottleneck is now in data
> checkpoint.
>
> My pseudo code is like below:
>
> JavaStreamingContext jssc = new JavaStreamingContext(
> sparkConf,Durations.seconds(2));
> jssc.checkpoint("spark-data/checkpoint");
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(...);
> JavaPairDStream<String, List> stats =
> messages.mapToPair(parseJson)
> .reduceByKey(REDUCE_STATS)
> .updateStateByKey(RUNNING_STATS);
>
>JavaPairDStream<String, List> newData =
> stages.filter(NEW_STATS);
>
>newData.foreachRDD{
>  rdd.forEachPartition{
>//Store to external storage.
>  }
>   }
>
>   Without using updateStageByKey, I'm only have the stats of the last
> micro-batch.
>
> Any advise on this?
>
>
> 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>:
>
>> Can you try storing the state (word count) in an external key value store?
>>
>> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Anyone could help me on this. It's a bit urgent for me on this.
>>> I'm very confused and curious about Spark data checkpoint performance?
>>> Is there any detail implementation of checkpoint I can look into?
>>> Spark Streaming only take sub-second to process 20K messages/sec,
>>> however it take 25 seconds for checkpoint. Now my application have average
>>> 30 seconds latency and keep increasingly.
>>>
>>>
>>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>>>
>>>> Thankd all, it would be great to have this feature soon.
>>>> Do you know what's the release plan for 1.6?
>>>>
>>>> In addition to this, I still have checkpoint performance problem
>>>>
>>>> My code is just simple like this:
>>>> JavaStreamingContext jssc = new
>>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>>> jssc.checkpoint("spark-data/checkpoint");
>>>> JavaPairInputDStream<String, String> messages =
>>>> KafkaUtils.createDirectStream(...);
>>>> JavaPairDStream<String, List> stats =
>>>> messages.mapToPair(parseJson)
>>>> .reduceByKey(REDUCE_STATS)
>>>> .updateStateByKey(RUNNING_STATS);
>>>>
>>>> stats.print()
>>>>
>>>>   Now I need to maintain about 800k keys, the stats here is only count
>>>> number of occurence for key.
>>>>   While running the cache dir is very small (about 50M), my question is:
>>>>
>>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>>> 10 seconds when data checkpoint is running
>>>>   It took me 5 seconds to finish the same size micro-batch, why it's
>>>> too high? what's kind of job in checkpoint?
>>>>   why it's keep increasing?
>>>>
>>>>   2/ When I changes the data checkpoint interval like using:
>>>>   stats.checkpoint(Durations.seconds(100)); //change to 100,
>>>> defaults is 10
>>>>
>>>>   The checkpoint is keep increasing significantly first checkpoint is
>>>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>>>   Why it's too high when increasing checkpoint interval?
>>>>
>>>> It seems that default interval works more stable.
>>>>
>>>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>>>>
>>>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>>>
>>>>> Left some comments on the JIRA and design doc.
>>>>>
>>>>> -adrian
>>>>>
>>>>> From: Shixiong Zhu
>>>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>>>> To: Thúy Hằng Lê
>>>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>>>> Subject: Re: Spark Streaming data checkpoint performance
>>>>>
>>>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>>>> performance issue of "updateStateByKey". You can take a look at
>>>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>>>> https://github.com/apache/spark/pull/9256
>>>>>
>>>>
>>>
>


Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Thúy Hằng Lê
Thanks Aniket,

I want to store the state to an external storage but it should be in later
step I think.
Basically, I have to use updateStateByKey function to maintain the running
state (which requires checkpoint), and my bottleneck is now in data
checkpoint.

My pseudo code is like below:

JavaStreamingContext jssc = new JavaStreamingContext(
sparkConf,Durations.seconds(2));
jssc.checkpoint("spark-data/checkpoint");
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(...);
JavaPairDStream<String, List> stats =
messages.mapToPair(parseJson)
.reduceByKey(REDUCE_STATS)
.updateStateByKey(RUNNING_STATS);

   JavaPairDStream<String, List> newData = stages.filter(NEW_STATS);

   newData.foreachRDD{
 rdd.forEachPartition{
   //Store to external storage.
 }
  }

  Without using updateStageByKey, I'm only have the stats of the last
micro-batch.

Any advise on this?


2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>:

> Can you try storing the state (word count) in an external key value store?
>
> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:
>
>> Hi all,
>>
>> Anyone could help me on this. It's a bit urgent for me on this.
>> I'm very confused and curious about Spark data checkpoint performance? Is
>> there any detail implementation of checkpoint I can look into?
>> Spark Streaming only take sub-second to process 20K messages/sec, however
>> it take 25 seconds for checkpoint. Now my application have average 30
>> seconds latency and keep increasingly.
>>
>>
>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>>
>>> Thankd all, it would be great to have this feature soon.
>>> Do you know what's the release plan for 1.6?
>>>
>>> In addition to this, I still have checkpoint performance problem
>>>
>>> My code is just simple like this:
>>> JavaStreamingContext jssc = new
>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>> jssc.checkpoint("spark-data/checkpoint");
>>> JavaPairInputDStream<String, String> messages =
>>> KafkaUtils.createDirectStream(...);
>>> JavaPairDStream<String, List> stats =
>>> messages.mapToPair(parseJson)
>>> .reduceByKey(REDUCE_STATS)
>>> .updateStateByKey(RUNNING_STATS);
>>>
>>> stats.print()
>>>
>>>   Now I need to maintain about 800k keys, the stats here is only count
>>> number of occurence for key.
>>>   While running the cache dir is very small (about 50M), my question is:
>>>
>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>> 10 seconds when data checkpoint is running
>>>   It took me 5 seconds to finish the same size micro-batch, why it's too
>>> high? what's kind of job in checkpoint?
>>>   why it's keep increasing?
>>>
>>>   2/ When I changes the data checkpoint interval like using:
>>>   stats.checkpoint(Durations.seconds(100)); //change to 100,
>>> defaults is 10
>>>
>>>   The checkpoint is keep increasing significantly first checkpoint is
>>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>>   Why it's too high when increasing checkpoint interval?
>>>
>>> It seems that default interval works more stable.
>>>
>>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>>>
>>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>>
>>>> Left some comments on the JIRA and design doc.
>>>>
>>>> -adrian
>>>>
>>>> From: Shixiong Zhu
>>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>>> To: Thúy Hằng Lê
>>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>>> Subject: Re: Spark Streaming data checkpoint performance
>>>>
>>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>>> performance issue of "updateStateByKey". You can take a look at
>>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>>> https://github.com/apache/spark/pull/9256
>>>>
>>>
>>


Re: Spark Streaming data checkpoint performance

2015-11-05 Thread Thúy Hằng Lê
Thankd all, it would be great to have this feature soon.
Do you know what's the release plan for 1.6?

In addition to this, I still have checkpoint performance problem

My code is just simple like this:
JavaStreamingContext jssc = new
JavaStreamingContext(sparkConf,Durations.seconds(2));
jssc.checkpoint("spark-data/checkpoint");
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(...);
JavaPairDStream<String, List> stats =
messages.mapToPair(parseJson)
.reduceByKey(REDUCE_STATS)
.updateStateByKey(RUNNING_STATS);

stats.print()

  Now I need to maintain about 800k keys, the stats here is only count
number of occurence for key.
  While running the cache dir is very small (about 50M), my question is:

  1/ For regular micro-batch it takes about 800ms to finish, but every 10
seconds when data checkpoint is running
  It took me 5 seconds to finish the same size micro-batch, why it's too
high? what's kind of job in checkpoint?
  why it's keep increasing?

  2/ When I changes the data checkpoint interval like using:
  stats.checkpoint(Durations.seconds(100)); //change to 100, defaults
is 10

  The checkpoint is keep increasing significantly first checkpoint is 10s,
second is 30s, third is 70s ... and keep increasing :)
  Why it's too high when increasing checkpoint interval?

It seems that default interval works more stable.

On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:

> Nice! Thanks for sharing, I wasn’t aware of the new API.
>
> Left some comments on the JIRA and design doc.
>
> -adrian
>
> From: Shixiong Zhu
> Date: Tuesday, November 3, 2015 at 3:32 AM
> To: Thúy Hằng Lê
> Cc: Adrian Tanase, "user@spark.apache.org"
> Subject: Re: Spark Streaming data checkpoint performance
>
> "trackStateByKey" is about to be added in 1.6 to resolve the performance
> issue of "updateStateByKey". You can take a look at
> https://issues.apache.org/jira/browse/SPARK-2629 and
> https://github.com/apache/spark/pull/9256
>


Re: Spark Streaming data checkpoint performance

2015-11-04 Thread Adrian Tanase
Nice! Thanks for sharing, I wasn’t aware of the new API.

Left some comments on the JIRA and design doc.

-adrian

From: Shixiong Zhu
Date: Tuesday, November 3, 2015 at 3:32 AM
To: Thúy Hằng Lê
Cc: Adrian Tanase, "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Spark Streaming data checkpoint performance

"trackStateByKey" is about to be added in 1.6 to resolve the performance issue 
of "updateStateByKey". You can take a look at 
https://issues.apache.org/jira/browse/SPARK-2629 and 
https://github.com/apache/spark/pull/9256


Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Adrian Tanase
You are correct, the default checkpointing interval is 10 seconds or your batch 
size, whichever is bigger. You can change it by calling .checkpoint(x) on your 
resulting Dstream.

For the rest, you are probably keeping an “all time” word count that grows 
unbounded if you never remove words from the map. Keep in mind that 
updateStateByKey is called for every key in the state RDD, regardless if you 
have new occurrences or not.

You should consider at least one of these strategies:

  *   run your word count on a windowed Dstream (e.g. Unique counts over the 
last 15 minutes)
 *   Your best bet here is reduceByKeyAndWindow with an inverse function
  *   Make your state object more complicated and try to prune out words with 
very few occurrences or that haven’t been updated for a long time
 *   You can do this by emitting None from updateStateByKey

Hope this helps,
-adrian

From: Thúy Hằng Lê
Date: Monday, November 2, 2015 at 7:20 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Spark Streaming data checkpoint performance

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(2));


Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Thúy Hằng Lê
Hi Andrian,

Thanks for the information.

However your 2 suggestions couldn't really work for me.

Accuracy is the most important aspect in my application. So keeping only 15
minutes window stats or prune out some of keys is impossible for my
application.

I can change the checking point interval as your suggestion,
however is there any other Spark configuration to turning the data
checkpoint performance?

And just curious, technically why updateStateByKey need to be called for
very key (regardless the new occurrences or not)? Does Spark has any plan
to fix it?
I have 4M keys need to maintain the statistics however only few of them are
changed in each batch interval.

2015-11-02 22:37 GMT+07:00 Adrian Tanase <atan...@adobe.com>:

> You are correct, the default checkpointing interval is 10 seconds or your
> batch size, whichever is bigger. You can change it by calling
> .checkpoint(x) on your resulting Dstream.
>
> For the rest, you are probably keeping an “all time” word count that grows
> unbounded if you never remove words from the map. Keep in mind that
> updateStateByKey is called for every key in the state RDD, regardless if
> you have new occurrences or not.
>
> You should consider at least one of these strategies:
>
>- run your word count on a windowed Dstream (e.g. Unique counts over
>the last 15 minutes)
>   - Your best bet here is reduceByKeyAndWindow with an inverse
>   function
>- Make your state object more complicated and try to prune out words
>with very few occurrences or that haven’t been updated for a long time
>   - You can do this by emitting None from updateStateByKey
>
> Hope this helps,
> -adrian
>
> From: Thúy Hằng Lê
> Date: Monday, November 2, 2015 at 7:20 AM
> To: "user@spark.apache.org"
> Subject: Spark Streaming data checkpoint performance
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(2));
>


Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Shixiong Zhu
"trackStateByKey" is about to be added in 1.6 to resolve the performance
issue of "updateStateByKey". You can take a look at
https://issues.apache.org/jira/browse/SPARK-2629 and
https://github.com/apache/spark/pull/9256


Spark Streaming data checkpoint performance

2015-11-01 Thread Thúy Hằng Lê
Hi Spark guru

I am evaluating Spark Streaming,

In my application I need to maintain cumulative statistics (e.g the total
running word count), so I need to call the updateStateByKey function on
very micro-batch.

After setting those things, I got following behaviors:
* The Processing Time is very high every 10 seconds - usually 5x
higher (which I guess it's data checking point job)
* The Processing Time becomes higher and higher over time, after 10
minutes it's much higher than the batch interval and lead to huge
Scheduling Delay and a lots Active Batches in queue.

My questions is:

 * Is this expected behavior? Is there any way to improve the
performance of data checking point?
 * How data checking point in Spark Streaming works? Does it need
to load all previous checking point data in order to build new one?

My job is very simple:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(2));
JavaPairInputDStream messages =
KafkaUtils.createDirectStream(...);

JavaPairDStream stats = messages.mapToPair(parseJson)
.reduceByKey(REDUCE_STATS) .updateStateByKey(RUNNING_STATS);

stats.print()


Re: spark streaming with checkpoint

2015-01-25 Thread Balakrishnan Narendran
Yeah use streaming to gather the incoming logs and write to log file then
run a spark job evry 5 minutes to process the counts. Got it. Thanks a
lot.

On 07:07, Mon, 26 Jan 2015 Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Tue, Jan 20, 2015 at 8:16 PM, balu.naren balu.na...@gmail.com wrote:

 I am a beginner to spark streaming. So have a basic doubt regarding
 checkpoints. My use case is to calculate the no of unique users by day. I
 am using reduce by key and window for this. Where my window duration is 24
 hours and slide duration is 5 mins.

 Adding to what others said, this feels more like a task for run a Spark
 job every five minutes using cron than using the sliding window
 functionality from Spark Streaming.

 Tobias



Re: spark streaming with checkpoint

2015-01-25 Thread Balakrishnan Narendran


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming with checkpoint

2015-01-25 Thread Tobias Pfeiffer
Hi,

On Tue, Jan 20, 2015 at 8:16 PM, balu.naren balu.na...@gmail.com wrote:

 I am a beginner to spark streaming. So have a basic doubt regarding
 checkpoints. My use case is to calculate the no of unique users by day. I
 am using reduce by key and window for this. Where my window duration is 24
 hours and slide duration is 5 mins.

Adding to what others said, this feels more like a task for run a Spark
job every five minutes using cron than using the sliding window
functionality from Spark Streaming.

Tobias


Re: spark streaming with checkpoint

2015-01-22 Thread Balakrishnan Narendran
Thank you Jerry,
   Does the window operation create new RDDs for each slide duration..?
I am asking this because i see a constant increase in memory even when
there is no logs received.
If not checkpoint is there any alternative that you would suggest.?


On Tue, Jan 20, 2015 at 7:08 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi,



 Seems you have such a large window (24 hours), so the phenomena of memory
 increasing is expectable, because of window operation will cache the RDD
 within this window in memory. So for your requirement, memory should be
 enough to hold the data of 24 hours.



 I don’t think checkpoint in Spark Streaming can alleviate such problem,
 because checkpoint are mainly for fault tolerance.



 Thanks

 Jerry



 *From:* balu.naren [mailto:balu.na...@gmail.com]
 *Sent:* Tuesday, January 20, 2015 7:17 PM
 *To:* user@spark.apache.org
 *Subject:* spark streaming with checkpoint



 I am a beginner to spark streaming. So have a basic doubt regarding
 checkpoints. My use case is to calculate the no of unique users by day. I
 am using reduce by key and window for this. Where my window duration is 24
 hours and slide duration is 5 mins. I am updating the processed record to
 mongodb. Currently I am replace the existing record each time. But I see
 the memory is slowly increasing over time and kills the process after 1 and
 1/2 hours(in aws small instance). The DB write after the restart clears all
 the old data. So I understand checkpoint is the solution for this. But my
 doubt is

- What should my check point duration be..? As per documentation it
says 5-10 times of slide duration. But I need the data of entire day. So it
is ok to keep 24 hrs.
- Where ideally should the checkpoint be..? Initially when I receive
the stream or just before the window operation or after the data reduction
has taken place.


 Appreciate your help.
 Thank you
  --

 View this message in context: spark streaming with checkpoint
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-with-checkpoint-tp21263.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



RE: spark streaming with checkpoint

2015-01-22 Thread Shao, Saisai
Hi,

A new RDD will be created in each slide duration, if there’s no data coming, an 
empty RDD will be generated.

I’m not sure there’s way to alleviate your problem from Spark side. Is your 
application design have to build such a large window, can you change your 
implementation if it is easy for you?

I think it’s better and easy for you to change your implementation rather than 
rely on Spark to handle this.

Thanks
Jerry

From: Balakrishnan Narendran [mailto:balu.na...@gmail.com]
Sent: Friday, January 23, 2015 12:19 AM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: spark streaming with checkpoint

Thank you Jerry,
   Does the window operation create new RDDs for each slide duration..? I 
am asking this because i see a constant increase in memory even when there is 
no logs received.
If not checkpoint is there any alternative that you would suggest.?


On Tue, Jan 20, 2015 at 7:08 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi,

Seems you have such a large window (24 hours), so the phenomena of memory 
increasing is expectable, because of window operation will cache the RDD within 
this window in memory. So for your requirement, memory should be enough to hold 
the data of 24 hours.

I don’t think checkpoint in Spark Streaming can alleviate such problem, because 
checkpoint are mainly for fault tolerance.

Thanks
Jerry

From: balu.naren [mailto:balu.na...@gmail.commailto:balu.na...@gmail.com]
Sent: Tuesday, January 20, 2015 7:17 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: spark streaming with checkpoint


I am a beginner to spark streaming. So have a basic doubt regarding 
checkpoints. My use case is to calculate the no of unique users by day. I am 
using reduce by key and window for this. Where my window duration is 24 hours 
and slide duration is 5 mins. I am updating the processed record to mongodb. 
Currently I am replace the existing record each time. But I see the memory is 
slowly increasing over time and kills the process after 1 and 1/2 hours(in aws 
small instance). The DB write after the restart clears all the old data. So I 
understand checkpoint is the solution for this. But my doubt is

  *   What should my check point duration be..? As per documentation it says 
5-10 times of slide duration. But I need the data of entire day. So it is ok to 
keep 24 hrs.
  *   Where ideally should the checkpoint be..? Initially when I receive the 
stream or just before the window operation or after the data reduction has 
taken place.

Appreciate your help.
Thank you


View this message in context: spark streaming with 
checkpointhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-with-checkpoint-tp21263.html
Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: spark streaming with checkpoint

2015-01-22 Thread Jörn Franke
Maybe you use a wrong approach - try something like hyperloglog or bitmap
structures as you can find them, for instance, in  redis. They are much
smaller
Le 22 janv. 2015 17:19, Balakrishnan Narendran balu.na...@gmail.com a
écrit :

 Thank you Jerry,
Does the window operation create new RDDs for each slide
 duration..? I am asking this because i see a constant increase in memory
 even when there is no logs received.
 If not checkpoint is there any alternative that you would suggest.?


 On Tue, Jan 20, 2015 at 7:08 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi,



 Seems you have such a large window (24 hours), so the phenomena of memory
 increasing is expectable, because of window operation will cache the RDD
 within this window in memory. So for your requirement, memory should be
 enough to hold the data of 24 hours.



 I don’t think checkpoint in Spark Streaming can alleviate such problem,
 because checkpoint are mainly for fault tolerance.



 Thanks

 Jerry



 *From:* balu.naren [mailto:balu.na...@gmail.com]
 *Sent:* Tuesday, January 20, 2015 7:17 PM
 *To:* user@spark.apache.org
 *Subject:* spark streaming with checkpoint



 I am a beginner to spark streaming. So have a basic doubt regarding
 checkpoints. My use case is to calculate the no of unique users by day. I
 am using reduce by key and window for this. Where my window duration is 24
 hours and slide duration is 5 mins. I am updating the processed record to
 mongodb. Currently I am replace the existing record each time. But I see
 the memory is slowly increasing over time and kills the process after 1 and
 1/2 hours(in aws small instance). The DB write after the restart clears all
 the old data. So I understand checkpoint is the solution for this. But my
 doubt is

- What should my check point duration be..? As per documentation it
says 5-10 times of slide duration. But I need the data of entire day. So 
 it
is ok to keep 24 hrs.
- Where ideally should the checkpoint be..? Initially when I receive
the stream or just before the window operation or after the data reduction
has taken place.


 Appreciate your help.
 Thank you
  --

 View this message in context: spark streaming with checkpoint
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-with-checkpoint-tp21263.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





spark streaming with checkpoint

2015-01-20 Thread balu.naren
I am a beginner to spark streaming. So have a basic doubt regarding
checkpoints. My use case is to calculate the no of unique users by day. I am
using reduce by key and window for this. Where my window duration is 24
hours and slide duration is 5 mins. I am updating the processed record to
mongodb. Currently I am replace the existing record each time. But I see the
memory is slowly increasing over time and kills the process after 1 and 1/2
hours(in aws small instance). The DB write after the restart clears all the
old data. So I understand checkpoint is the solution for this. But my doubt
is
  
 What should my check point duration be..? As per documentation it says 5-10
times of slide duration. But I need the data of entire day. So it is ok to
keep 24 hrs.
Where ideally should the checkpoint be..? Initially when I receive the
stream or just before the window operation or after the data reduction has
taken place.

Appreciate your help.
Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-with-checkpoint-tp21263.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: spark streaming with checkpoint

2015-01-20 Thread Shao, Saisai
Hi,

Seems you have such a large window (24 hours), so the phenomena of memory 
increasing is expectable, because of window operation will cache the RDD within 
this window in memory. So for your requirement, memory should be enough to hold 
the data of 24 hours.

I don't think checkpoint in Spark Streaming can alleviate such problem, because 
checkpoint are mainly for fault tolerance.

Thanks
Jerry

From: balu.naren [mailto:balu.na...@gmail.com]
Sent: Tuesday, January 20, 2015 7:17 PM
To: user@spark.apache.org
Subject: spark streaming with checkpoint


I am a beginner to spark streaming. So have a basic doubt regarding 
checkpoints. My use case is to calculate the no of unique users by day. I am 
using reduce by key and window for this. Where my window duration is 24 hours 
and slide duration is 5 mins. I am updating the processed record to mongodb. 
Currently I am replace the existing record each time. But I see the memory is 
slowly increasing over time and kills the process after 1 and 1/2 hours(in aws 
small instance). The DB write after the restart clears all the old data. So I 
understand checkpoint is the solution for this. But my doubt is

  *   What should my check point duration be..? As per documentation it says 
5-10 times of slide duration. But I need the data of entire day. So it is ok to 
keep 24 hrs.
  *   Where ideally should the checkpoint be..? Initially when I receive the 
stream or just before the window operation or after the data reduction has 
taken place.

Appreciate your help.
Thank you


View this message in context: spark streaming with 
checkpointhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-with-checkpoint-tp21263.html
Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.