Re: Spark Streaming application code change and stateful transformations

2015-09-17 Thread Adrian Tanase
This section in the streaming guide also outlines a new option – use 2 versions 
in parallel for a period of time, controlling the draining / transition in the 
application level.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code

Also – I would not dismiss the graceful shutdown approach, since you’re 
controlling the shutdown.
At a minimum, you can monitor if it was successful and if it failed, you simply 
restart the app, relying on checkpoint recovery before trying again…

I’m copy-pasting more details from an answer I posted earlier to a similar 
question:

  1.  Use 2 versions in parallel, drain the queue up to a point and strat fresh 
in the new version, only processing events from that point forward
 *   Note that “up to a point” is specific to you state management logic, 
it might mean “user sessions stated after 4 am” NOT “events received after 4 am”
  2.  Graceful shutdown and saving data to DB, followed by checkpoint cleanup / 
new checkpoint dir
 *   On restat, you need to use the updateStateByKey that takes an 
initialRdd with the values preloaded from DB
 *   By cleaning the checkpoint in between upgrades, data is loaded only 
once

Hope this helps,
-adrian

From: Ofir Kerker
Date: Wednesday, September 16, 2015 at 6:12 PM
To: Cody Koeninger
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Spark Streaming application code change and stateful 
transformations

Thanks Cody!
The 2nd solution is safer but seems wasteful :/
I'll try to optimize it by keeping in addition to the 'last-complete-hour' the 
corresponding offsets that bound the incomplete data to try and fast-forward 
only the last couple of hours in the worst case.

On Mon, Sep 14, 2015 at 22:14 Cody Koeninger 
<c...@koeninger.org<mailto:c...@koeninger.org>> wrote:
Solution 2 sounds better to me.  You aren't always going to have graceful 
shutdowns.

On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker 
<ofir.ker...@gmail.com<mailto:ofir.ker...@gmail.com>> wrote:
Hi,
My Spark Streaming application consumes messages (events) from Kafka every
10 seconds using the direct stream approach and aggregates these messages
into hourly aggregations (to answer analytics questions like: "How many
users from Paris visited page X between 8PM to 9PM") and save the data to
Cassandra.

I was wondering if there's a good practice for handling a code change in a
Spark Streaming applications that uses stateful transformations
(updateStateByKey for example) because the new application code will not be
able to use the data that was checkpointed by the former application.
I have thought of a few solutions for this issue and was hoping some of you
have some experience with such case and can suggest other solutions or
feedback my suggested solutions:
*Solution #1*: On a graceful shutdown, in addition to the current Kafka
offsets, persist the current aggregated data into Cassandra tables
(different than the regular aggregation tables) that would allow reading
them easily when the new application starts in order to build the initial
state.
*Solution #2*: When an hour is "complete" (i.e not expecting more events
with the timestamp of this hour), update somewhere persistent (DB / shared
file) the last-complete-hour. This will allow me, when the new application
starts, to read all the events from Kafka from the beginning of retention
period (last X hours) and ignore events from timestamp smaller or equal than
the last-complete-hour.

I'll be happy to get your feedback!

Thanks,
Ofir




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>




Re: Spark Streaming application code change and stateful transformations

2015-09-17 Thread Cody Koeninger
The reason I'm dismissing the graceful shutdown approach is that if your
app crashes, and can't be restarted without code changes (e.g. a bug needs
to be fixed), you're screwed.

On Thu, Sep 17, 2015 at 3:56 AM, Adrian Tanase <atan...@adobe.com> wrote:

> This section in the streaming guide also outlines a new option – use 2
> versions in parallel for a period of time, controlling the draining /
> transition in the application level.
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code
>
> Also – I would not dismiss the graceful shutdown approach, since you’re
> controlling the shutdown.
> At a minimum, you can monitor if it was successful and if it failed, you
> simply restart the app, relying on checkpoint recovery before trying again…
>
> I’m copy-pasting more details from an answer I posted earlier to a similar
> question:
>
>1. Use 2 versions in parallel, drain the queue up to a point and strat
>fresh in the new version, only processing events from that point forward
>   1. Note that “up to a point” is specific to you state management
>   logic, it might mean “user sessions stated after 4 am” NOT “events 
> received
>   after 4 am”
>2. Graceful shutdown and saving data to DB, followed by checkpoint
>cleanup / new checkpoint dir
>   1. On restat, you need to use the updateStateByKey that takes an
>   initialRdd with the values preloaded from DB
>   2. By cleaning the checkpoint in between upgrades, data is loaded
>   only once
>
> Hope this helps,
> -adrian
>
> From: Ofir Kerker
> Date: Wednesday, September 16, 2015 at 6:12 PM
> To: Cody Koeninger
> Cc: "user@spark.apache.org"
> Subject: Re: Spark Streaming application code change and stateful
> transformations
>
> Thanks Cody!
> The 2nd solution is safer but seems wasteful :/
> I'll try to optimize it by keeping in addition to the 'last-complete-hour'
> the corresponding offsets that bound the incomplete data to try and
> fast-forward only the last couple of hours in the worst case.
>
> On Mon, Sep 14, 2015 at 22:14 Cody Koeninger <c...@koeninger.org> wrote:
>
>> Solution 2 sounds better to me.  You aren't always going to have graceful
>> shutdowns.
>>
>> On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker <ofir.ker...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> My Spark Streaming application consumes messages (events) from Kafka
>>> every
>>> 10 seconds using the direct stream approach and aggregates these messages
>>> into hourly aggregations (to answer analytics questions like: "How many
>>> users from Paris visited page X between 8PM to 9PM") and save the data to
>>> Cassandra.
>>>
>>> I was wondering if there's a good practice for handling a code change in
>>> a
>>> Spark Streaming applications that uses stateful transformations
>>> (updateStateByKey for example) because the new application code will not
>>> be
>>> able to use the data that was checkpointed by the former application.
>>> I have thought of a few solutions for this issue and was hoping some of
>>> you
>>> have some experience with such case and can suggest other solutions or
>>> feedback my suggested solutions:
>>> *Solution #1*: On a graceful shutdown, in addition to the current Kafka
>>> offsets, persist the current aggregated data into Cassandra tables
>>> (different than the regular aggregation tables) that would allow reading
>>> them easily when the new application starts in order to build the initial
>>> state.
>>> *Solution #2*: When an hour is "complete" (i.e not expecting more events
>>> with the timestamp of this hour), update somewhere persistent (DB /
>>> shared
>>> file) the last-complete-hour. This will allow me, when the new
>>> application
>>> starts, to read all the events from Kafka from the beginning of retention
>>> period (last X hours) and ignore events from timestamp smaller or equal
>>> than
>>> the last-complete-hour.
>>>
>>> I'll be happy to get your feedback!
>>>
>>> Thanks,
>>> Ofir
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Re: Spark Streaming application code change and stateful transformations

2015-09-16 Thread Ofir Kerker
Thanks Cody!
The 2nd solution is safer but seems wasteful :/
I'll try to optimize it by keeping in addition to the 'last-complete-hour'
the corresponding offsets that bound the incomplete data to try and
fast-forward only the last couple of hours in the worst case.

On Mon, Sep 14, 2015 at 22:14 Cody Koeninger  wrote:

> Solution 2 sounds better to me.  You aren't always going to have graceful
> shutdowns.
>
> On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker 
> wrote:
>
>> Hi,
>> My Spark Streaming application consumes messages (events) from Kafka every
>> 10 seconds using the direct stream approach and aggregates these messages
>> into hourly aggregations (to answer analytics questions like: "How many
>> users from Paris visited page X between 8PM to 9PM") and save the data to
>> Cassandra.
>>
>> I was wondering if there's a good practice for handling a code change in a
>> Spark Streaming applications that uses stateful transformations
>> (updateStateByKey for example) because the new application code will not
>> be
>> able to use the data that was checkpointed by the former application.
>> I have thought of a few solutions for this issue and was hoping some of
>> you
>> have some experience with such case and can suggest other solutions or
>> feedback my suggested solutions:
>> *Solution #1*: On a graceful shutdown, in addition to the current Kafka
>> offsets, persist the current aggregated data into Cassandra tables
>> (different than the regular aggregation tables) that would allow reading
>> them easily when the new application starts in order to build the initial
>> state.
>> *Solution #2*: When an hour is "complete" (i.e not expecting more events
>> with the timestamp of this hour), update somewhere persistent (DB / shared
>> file) the last-complete-hour. This will allow me, when the new application
>> starts, to read all the events from Kafka from the beginning of retention
>> period (last X hours) and ignore events from timestamp smaller or equal
>> than
>> the last-complete-hour.
>>
>> I'll be happy to get your feedback!
>>
>> Thanks,
>> Ofir
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Streaming application code change and stateful transformations

2015-09-14 Thread Cody Koeninger
Solution 2 sounds better to me.  You aren't always going to have graceful
shutdowns.

On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker  wrote:

> Hi,
> My Spark Streaming application consumes messages (events) from Kafka every
> 10 seconds using the direct stream approach and aggregates these messages
> into hourly aggregations (to answer analytics questions like: "How many
> users from Paris visited page X between 8PM to 9PM") and save the data to
> Cassandra.
>
> I was wondering if there's a good practice for handling a code change in a
> Spark Streaming applications that uses stateful transformations
> (updateStateByKey for example) because the new application code will not be
> able to use the data that was checkpointed by the former application.
> I have thought of a few solutions for this issue and was hoping some of you
> have some experience with such case and can suggest other solutions or
> feedback my suggested solutions:
> *Solution #1*: On a graceful shutdown, in addition to the current Kafka
> offsets, persist the current aggregated data into Cassandra tables
> (different than the regular aggregation tables) that would allow reading
> them easily when the new application starts in order to build the initial
> state.
> *Solution #2*: When an hour is "complete" (i.e not expecting more events
> with the timestamp of this hour), update somewhere persistent (DB / shared
> file) the last-complete-hour. This will allow me, when the new application
> starts, to read all the events from Kafka from the beginning of retention
> period (last X hours) and ignore events from timestamp smaller or equal
> than
> the last-complete-hour.
>
> I'll be happy to get your feedback!
>
> Thanks,
> Ofir
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>