Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Paris Carbone
Hi Gabor,

The sinks should aware that the global checkpoint is indeed persisted before 
emitting so they will have to wait until they are notified for its completion 
before pushing to Kafka. The current view of the local state is not the actual 
persisted view so checking against is like relying on dirty state. Imagine the 
following scenario:

1) sink pushes to kafka record k and updates local buffer for k
2) sink snapshots k and the rest of its state on checkpoint barrier
3) global checkpoint fails due to some reason (e.g. another sink subtask 
failed) and the job gets restarted
4) sink pushes again record k to kafka since the last global snapshots did not 
complete before and k is not in the local buffer

Chesnay’s approach seems to be valid and best effort for the time being.

Paris

> On 05 Feb 2016, at 13:09, Gábor Gévay  wrote:
> 
> Hello,
> 
>> I think that there is actually a fundamental latency issue with
>> "exactly once sinks", no matter how you implement them in any systems:
>> You can only commit once you are sure that everything went well,
>> to a specific point where you are sure no replay will ever be needed.
> 
> What if the persistent buffer in the sink would be used to determine
> which data elements should be emitted in case of a replay? I mean, the
> sink pushes everything as soon as it arrives, and also writes
> everything to the persistent buffer, and then in case of a replay it
> looks into the buffer before pushing every element, and only does the
> push if the buffer says that the element was not pushed before.
> 
> Best,
> Gábor
> 
> 
> 2016-02-05 11:57 GMT+01:00 Stephan Ewen :
>> Hi Niels!
>> 
>> In general, exactly once output requires transactional cooperation from the
>> target system. Kafka has that on the roadmap, we should be able to integrate
>> that once it is out.
>> That means output is "committed" upon completed checkpoints, which
>> guarantees nothing is written multiple times.
>> 
>> Chesnay is working on an interesting prototype as a generic solution (also
>> for Kafka, while they don't have that feature):
>> It buffers the data in the sink persistently (using the fault tolerance
>> state backends) and pushes the results out on notification of a completed
>> checkpoint.
>> That gives you exactly once semantics, but involves an extra materialization
>> of the data.
>> 
>> 
>> I think that there is actually a fundamental latency issue with "exactly
>> once sinks", no matter how you implement them in any systems:
>> You can only commit once you are sure that everything went well, to a
>> specific point where you are sure no replay will ever be needed.
>> 
>> So the latency in Flink for an exactly-once output would be at least the
>> checkpoint interval.
>> 
>> I'm eager to hear your thoughts on this.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes  wrote:
>>> 
>>> Hi,
>>> 
>>> It is my understanding that the exactly-once semantics regarding the input
>>> from Kafka is based on the checkpointing in the source component retaining
>>> the offset where it was at the checkpoint moment.
>>> 
>>> My question is how does that work for a sink? How can I make sure that (in
>>> light of failures) each message that is read from Kafka (my input) is
>>> written to Kafka (my output) exactly once?
>>> 
>>> 
>>> --
>>> Best regards / Met vriendelijke groeten,
>>> 
>>> Niels Basjes
>> 
>> 



Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Paris Carbone
That would be good indeed. I just learned about it from Stephan mentioned. It 
sounds correct to me along the lines but it would be nice to see the details.

> On 05 Feb 2016, at 13:32, Ufuk Celebi  wrote:
> 
> 
>> On 05 Feb 2016, at 13:28, Paris Carbone  wrote:
>> 
>> Hi Gabor,
>> 
>> The sinks should aware that the global checkpoint is indeed persisted before 
>> emitting so they will have to wait until they are notified for its 
>> completion before pushing to Kafka. The current view of the local state is 
>> not the actual persisted view so checking against is like relying on dirty 
>> state. Imagine the following scenario:
>> 
>> 1) sink pushes to kafka record k and updates local buffer for k
>> 2) sink snapshots k and the rest of its state on checkpoint barrier
>> 3) global checkpoint fails due to some reason (e.g. another sink subtask 
>> failed) and the job gets restarted
>> 4) sink pushes again record k to kafka since the last global snapshots did 
>> not complete before and k is not in the local buffer
>> 
>> Chesnay’s approach seems to be valid and best effort for the time being.
> 
> Chesnay’s approach is not part of this thread. Can you or Chesnay 
> elaborate/provide a link?
> 
> – Ufuk
> 



Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Paris Carbone
From what I understood state on sinks is included in the operator state of the 
sinks and pushed to kafka when 3-phase commit is complete.
i.e. when the checkpoint completion notification arrives at the sinks. 

There are several pitfalls I am really curious to check and see how they are 
(going to be) handled, this is of course not as simple as it sounds. It really 
depends on the guarantees and operations the outside storage gives you. For 
example, how can we know that the pushed records are actually persisted in 
kafka in a single transaction? Not as simple as it sounds.

@Chesnay can you tell us more?

> On 05 Feb 2016, at 13:33, Paris Carbone  wrote:
> 
> That would be good indeed. I just learned about it from Stephan mentioned. It 
> sounds correct to me along the lines but it would be nice to see the details.
> 
>> On 05 Feb 2016, at 13:32, Ufuk Celebi  wrote:
>> 
>> 
>>> On 05 Feb 2016, at 13:28, Paris Carbone  wrote:
>>> 
>>> Hi Gabor,
>>> 
>>> The sinks should aware that the global checkpoint is indeed persisted 
>>> before emitting so they will have to wait until they are notified for its 
>>> completion before pushing to Kafka. The current view of the local state is 
>>> not the actual persisted view so checking against is like relying on dirty 
>>> state. Imagine the following scenario:
>>> 
>>> 1) sink pushes to kafka record k and updates local buffer for k
>>> 2) sink snapshots k and the rest of its state on checkpoint barrier
>>> 3) global checkpoint fails due to some reason (e.g. another sink subtask 
>>> failed) and the job gets restarted
>>> 4) sink pushes again record k to kafka since the last global snapshots did 
>>> not complete before and k is not in the local buffer
>>> 
>>> Chesnay’s approach seems to be valid and best effort for the time being.
>> 
>> Chesnay’s approach is not part of this thread. Can you or Chesnay 
>> elaborate/provide a link?
>> 
>> – Ufuk
>> 
> 



Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Paris Carbone
This is not a bad take. It still makes a few assumptions

1) the output checkpoints the id of the last *known* ID that was *persisted* in 
kafka (not just pushed)
2) we assume deterministic tuple order, as Stephan pointed out

On 05 Feb 2016, at 13:41, Niels Basjes 
mailto:ni...@basjes.nl>> wrote:

Hi,

Buffering the data (in all cases) would hurt the latency so much that Flink is 
effectively reverting to microbatching (where batch size is checkpoint period) 
with regards of the output.

My initial thoughts on how to solve this was as follows:
1) The output persists the ID of the last message it wrote to Kafka in the 
checkpoint.
2) Upon recovery the sink would
2a) Record the offset Kafka is at at that point in time
2b) For all 'new' messages validate if it must write this message by reading 
from Kafka (starting at the offset in the checkpoint) and if the message is 
already present it would skip it.
3) If a message arrives that has not yet written the message is written. Under 
the assumption that the messages arrive in the same order as before the sink 
can now simply run as normal.

This way the performance is only impacted in the (short) period after the 
recovery of a disturbance.

What do you think?

Niels Basjes



On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen 
mailto:se...@apache.org>> wrote:
Hi Niels!

In general, exactly once output requires transactional cooperation from the 
target system. Kafka has that on the roadmap, we should be able to integrate 
that once it is out.
That means output is "committed" upon completed checkpoints, which guarantees 
nothing is written multiple times.

Chesnay is working on an interesting prototype as a generic solution (also for 
Kafka, while they don't have that feature):
It buffers the data in the sink persistently (using the fault tolerance state 
backends) and pushes the results out on notification of a completed checkpoint.
That gives you exactly once semantics, but involves an extra materialization of 
the data.


I think that there is actually a fundamental latency issue with "exactly once 
sinks", no matter how you implement them in any systems:
You can only commit once you are sure that everything went well, to a specific 
point where you are sure no replay will ever be needed.

So the latency in Flink for an exactly-once output would be at least the 
checkpoint interval.

I'm eager to hear your thoughts on this.

Greetings,
Stephan


On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes 
mailto:ni...@basjes.nl>> wrote:
Hi,

It is my understanding that the exactly-once semantics regarding the input from 
Kafka is based on the checkpointing in the source component retaining the 
offset where it was at the checkpoint moment.

My question is how does that work for a sink? How can I make sure that (in 
light of failures) each message that is read from Kafka (my input) is written 
to Kafka (my output) exactly once?


--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes



Re: FYI: Updated Slides Section

2016-04-04 Thread Paris Carbone
Some people might find my slides on the FT fundamentals from last summer 
interesting. If you like it feel free to include it.

http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha

Paris

On 04 Apr 2016, at 11:33, Ufuk Celebi mailto:u...@apache.org>> 
wrote:

Dear Flink community,

I have updated the Material section on the Flink project page and
moved the slides section to a separate page.

You can find links to slides and talks here now:
http://flink.apache.org/slides.html

I've added slides for talks from this year by Till Rohrmann, Vasia
Kalavri, Robert Metzger, Jamie Girer and Kostas Tzoumas. If you think
that something is missing, feel free to ping in this thread.

– Ufuk



Re: flink snapshotting fault-tolerance

2016-05-19 Thread Paris Carbone
Hi Stavros,

Currently, rollback failure recovery in Flink works in the pipeline level, not 
in the task level (see Millwheel [1]). It further builds on repayable stream 
logs (i.e. Kafka), thus, there is no need for 3pc or backup in the pipeline 
sources. You can also check this presentation [2] which explains the basic 
concepts more in detail I hope. Mind that many upcoming optimisation 
opportunities are going to be addressed in the not so long-term Flink roadmap.

Paris

[1] 
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
[2] 
http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha




On 19 May 2016, at 19:43, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

Cool thnx. So if a checkpoint expires the pipeline will block or fail in total 
or only the specific task related to the operator (running along with the 
checkpoint task) or nothing happens?

On Tue, May 17, 2016 at 3:49 PM, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Hi Stravos,

I haven't implemented our checkpointing mechanism and I didn't participate in 
the design decisions while implementing it, so I can not compare it in detail 
to other approaches.

>From a "does it work perspective": Checkpoints are only confirmed if all 
>parallel subtasks successfully created a valid snapshot of the state. So if 
>there is a failure in the checkpointing mechanism, no valid checkpoint will be 
>created. The system will recover from the last valid checkpoint.
There is a timeout for checkpoints. So if a barrier doesn't pass through the 
system for a certain period of time, the checkpoint is cancelled. The default 
timeout is 10 minutes.

Regards,
Robert


On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:
Hi,

I was looking into the flink snapshotting algorithm details also mentioned here:
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/
http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html

>From other sources i understand that it assumes no failures to work for 
>message delivery or for example a process hanging for ever:
https://en.wikipedia.org/wiki/Snapshot_algorithm
https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

So my understanding (maybe wrong) is that this is a solution which seems not to 
address the fault tolerance issue in a strong manner like for example if it was 
to use a 3pc protocol for local state propagation and global agreement. I know 
the latter is not efficient just mentioning it for comparison.

How the algorithm behaves in practical terms under the presence of its own 
failures (this is a background process collecting partial states)? Are there 
timeouts for reaching a barrier?

PS. have not looked deep into the code details yet, planning to.

Best,
Stavros






Re: flink snapshotting fault-tolerance

2016-05-19 Thread Paris Carbone

Regarding your last question,
If a checkpoint expires it just gets invalidated and a new complete checkpoint 
will eventually occur that can be used for recovery. If I am wrong, or 
something has changed please correct me.

Paris

On 19 May 2016, at 20:14, Paris Carbone mailto:par...@kth.se>> 
wrote:

Hi Stavros,

Currently, rollback failure recovery in Flink works in the pipeline level, not 
in the task level (see Millwheel [1]). It further builds on repayable stream 
logs (i.e. Kafka), thus, there is no need for 3pc or backup in the pipeline 
sources. You can also check this presentation [2] which explains the basic 
concepts more in detail I hope. Mind that many upcoming optimisation 
opportunities are going to be addressed in the not so long-term Flink roadmap.

Paris

[1] 
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
[2] 
http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
On 19 May 2016, at 19:43, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

Cool thnx. So if a checkpoint expires the pipeline will block or fail in total 
or only the specific task related to the operator (running along with the 
checkpoint task) or nothing happens?

On Tue, May 17, 2016 at 3:49 PM, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Hi Stravos,

I haven't implemented our checkpointing mechanism and I didn't participate in 
the design decisions while implementing it, so I can not compare it in detail 
to other approaches.

>From a "does it work perspective": Checkpoints are only confirmed if all 
>parallel subtasks successfully created a valid snapshot of the state. So if 
>there is a failure in the checkpointing mechanism, no valid checkpoint will be 
>created. The system will recover from the last valid checkpoint.
There is a timeout for checkpoints. So if a barrier doesn't pass through the 
system for a certain period of time, the checkpoint is cancelled. The default 
timeout is 10 minutes.

Regards,
Robert


On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:
Hi,

I was looking into the flink snapshotting algorithm details also mentioned here:
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/
http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html

>From other sources i understand that it assumes no failures to work for 
>message delivery or for example a process hanging for ever:
https://en.wikipedia.org/wiki/Snapshot_algorithm
https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

So my understanding (maybe wrong) is that this is a solution which seems not to 
address the fault tolerance issue in a strong manner like for example if it was 
to use a 3pc protocol for local state propagation and global agreement. I know 
the latter is not efficient just mentioning it for comparison.

How the algorithm behaves in practical terms under the presence of its own 
failures (this is a background process collecting partial states)? Are there 
timeouts for reaching a barrier?

PS. have not looked deep into the code details yet, planning to.

Best,
Stavros







Re: flink snapshotting fault-tolerance

2016-05-19 Thread Paris Carbone
True, if you like formal modelling and stuff like that you can think of it as a 
more relaxed/abortable operation (e.g. like abortable consensus) which yields 
the same guarantees and works ok in semi-synchronous distributed systems (as in 
the case of Flink).

On 19 May 2016, at 20:22, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

Hey thnx for the links. There are assumptions though like reliable channels... 
since you rely on tcp in practice and if a checkpoint fails or is very slow 
then you need to deal with it thats why i asked previously what happens 
then..  3cp does not need assumptions i think, but engineering is more 
practical (it should be) and a different story in general.
The [2] mentions also the assumptions...

Best,
Stavros

On Thu, May 19, 2016 at 9:14 PM, Paris Carbone 
mailto:par...@kth.se>> wrote:
Hi Stavros,

Currently, rollback failure recovery in Flink works in the pipeline level, not 
in the task level (see Millwheel [1]). It further builds on repayable stream 
logs (i.e. Kafka), thus, there is no need for 3pc or backup in the pipeline 
sources. You can also check this presentation [2] which explains the basic 
concepts more in detail I hope. Mind that many upcoming optimisation 
opportunities are going to be addressed in the not so long-term Flink roadmap.

Paris

[1] 
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
[2] 
http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
On 19 May 2016, at 19:43, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

Cool thnx. So if a checkpoint expires the pipeline will block or fail in total 
or only the specific task related to the operator (running along with the 
checkpoint task) or nothing happens?

On Tue, May 17, 2016 at 3:49 PM, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Hi Stravos,

I haven't implemented our checkpointing mechanism and I didn't participate in 
the design decisions while implementing it, so I can not compare it in detail 
to other approaches.

>From a "does it work perspective": Checkpoints are only confirmed if all 
>parallel subtasks successfully created a valid snapshot of the state. So if 
>there is a failure in the checkpointing mechanism, no valid checkpoint will be 
>created. The system will recover from the last valid checkpoint.
There is a timeout for checkpoints. So if a barrier doesn't pass through the 
system for a certain period of time, the checkpoint is cancelled. The default 
timeout is 10 minutes.

Regards,
Robert


On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:
Hi,

I was looking into the flink snapshotting algorithm details also mentioned here:
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/
http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html

>From other sources i understand that it assumes no failures to work for 
>message delivery or for example a process hanging for ever:
https://en.wikipedia.org/wiki/Snapshot_algorithm
https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

So my understanding (maybe wrong) is that this is a solution which seems not to 
address the fault tolerance issue in a strong manner like for example if it was 
to use a 3pc protocol for local state propagation and global agreement. I know 
the latter is not efficient just mentioning it for comparison.

How the algorithm behaves in practical terms under the presence of its own 
failures (this is a background process collecting partial states)? Are there 
timeouts for reaching a barrier?

PS. have not looked deep into the code details yet, planning to.

Best,
Stavros








Re: flink snapshotting fault-tolerance

2016-05-19 Thread Paris Carbone
In that case, typically a timeout invalidates the whole snapshot (all states 
for the same epoch) until eventually we have a full complete snapshot.

On 19 May 2016, at 20:26, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

"Checkpoints are only confirmed if all parallel subtasks successfully created a 
valid snapshot of the state." as stated by Robert. So to rephrase my 
question... how confirmation that all snapshots are finished is done and what 
happens if some task is very slow...or is blocked?
If you have N tasks confirmed and one missing what do you do? You start a new 
checkpoint for that one? or a global new checkpoint for the rest of N tasks as 
well?

On Thu, May 19, 2016 at 9:21 PM, Paris Carbone 
mailto:par...@kth.se>> wrote:

Regarding your last question,
If a checkpoint expires it just gets invalidated and a new complete checkpoint 
will eventually occur that can be used for recovery. If I am wrong, or 
something has changed please correct me.

Paris

On 19 May 2016, at 20:14, Paris Carbone mailto:par...@kth.se>> 
wrote:

Hi Stavros,

Currently, rollback failure recovery in Flink works in the pipeline level, not 
in the task level (see Millwheel [1]). It further builds on repayable stream 
logs (i.e. Kafka), thus, there is no need for 3pc or backup in the pipeline 
sources. You can also check this presentation [2] which explains the basic 
concepts more in detail I hope. Mind that many upcoming optimisation 
opportunities are going to be addressed in the not so long-term Flink roadmap.

Paris

[1] 
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
[2] 
http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
On 19 May 2016, at 19:43, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

Cool thnx. So if a checkpoint expires the pipeline will block or fail in total 
or only the specific task related to the operator (running along with the 
checkpoint task) or nothing happens?

On Tue, May 17, 2016 at 3:49 PM, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Hi Stravos,

I haven't implemented our checkpointing mechanism and I didn't participate in 
the design decisions while implementing it, so I can not compare it in detail 
to other approaches.

>From a "does it work perspective": Checkpoints are only confirmed if all 
>parallel subtasks successfully created a valid snapshot of the state. So if 
>there is a failure in the checkpointing mechanism, no valid checkpoint will be 
>created. The system will recover from the last valid checkpoint.
There is a timeout for checkpoints. So if a barrier doesn't pass through the 
system for a certain period of time, the checkpoint is cancelled. The default 
timeout is 10 minutes.

Regards,
Robert


On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:
Hi,

I was looking into the flink snapshotting algorithm details also mentioned here:
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/
http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html

>From other sources i understand that it assumes no failures to work for 
>message delivery or for example a process hanging for ever:
https://en.wikipedia.org/wiki/Snapshot_algorithm
https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

So my understanding (maybe wrong) is that this is a solution which seems not to 
address the fault tolerance issue in a strong manner like for example if it was 
to use a 3pc protocol for local state propagation and global agreement. I know 
the latter is not efficient just mentioning it for comparison.

How the algorithm behaves in practical terms under the presence of its own 
failures (this is a background process collecting partial states)? Are there 
timeouts for reaching a barrier?

PS. have not looked deep into the code details yet, planning to.

Best,
Stavros









Re: flink snapshotting fault-tolerance

2016-05-19 Thread Paris Carbone
Invalidations are not necessarily exposed (I hope). Think of it as implementing 
TCP, you don’t have to warn the user that packets are lost since eventually a 
packet will be received at the other side in an eventually sunchronous system. 
Snapshots follow the same paradigm. Hope that helps.

On 19 May 2016, at 20:33, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

Yes thats what i was thinking thnx. When people here exactly once they think 
are you sure, there is something hidden there... because theory is theory :)
So if you keep getting invalidated snapshots but data passes through operators 
you issue a warning or fail the pipeline and return an exception to the driver?


On Thu, May 19, 2016 at 9:30 PM, Paris Carbone 
mailto:par...@kth.se>> wrote:
In that case, typically a timeout invalidates the whole snapshot (all states 
for the same epoch) until eventually we have a full complete snapshot.


On 19 May 2016, at 20:26, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

"Checkpoints are only confirmed if all parallel subtasks successfully created a 
valid snapshot of the state." as stated by Robert. So to rephrase my 
question... how confirmation that all snapshots are finished is done and what 
happens if some task is very slow...or is blocked?
If you have N tasks confirmed and one missing what do you do? You start a new 
checkpoint for that one? or a global new checkpoint for the rest of N tasks as 
well?

On Thu, May 19, 2016 at 9:21 PM, Paris Carbone 
mailto:par...@kth.se>> wrote:

Regarding your last question,
If a checkpoint expires it just gets invalidated and a new complete checkpoint 
will eventually occur that can be used for recovery. If I am wrong, or 
something has changed please correct me.

Paris

On 19 May 2016, at 20:14, Paris Carbone mailto:par...@kth.se>> 
wrote:

Hi Stavros,

Currently, rollback failure recovery in Flink works in the pipeline level, not 
in the task level (see Millwheel [1]). It further builds on repayable stream 
logs (i.e. Kafka), thus, there is no need for 3pc or backup in the pipeline 
sources. You can also check this presentation [2] which explains the basic 
concepts more in detail I hope. Mind that many upcoming optimisation 
opportunities are going to be addressed in the not so long-term Flink roadmap.

Paris

[1] 
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
[2] 
http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
On 19 May 2016, at 19:43, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

Cool thnx. So if a checkpoint expires the pipeline will block or fail in total 
or only the specific task related to the operator (running along with the 
checkpoint task) or nothing happens?

On Tue, May 17, 2016 at 3:49 PM, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Hi Stravos,

I haven't implemented our checkpointing mechanism and I didn't participate in 
the design decisions while implementing it, so I can not compare it in detail 
to other approaches.

From a "does it work perspective": Checkpoints are only confirmed if all 
parallel subtasks successfully created a valid snapshot of the state. So if 
there is a failure in the checkpointing mechanism, no valid checkpoint will be 
created. The system will recover from the last valid checkpoint.
There is a timeout for checkpoints. So if a barrier doesn't pass through the 
system for a certain period of time, the checkpoint is cancelled. The default 
timeout is 10 minutes.

Regards,
Robert


On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:
Hi,

I was looking into the flink snapshotting algorithm details also mentioned here:
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/
http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html

From other sources i understand that it assumes no failures to work for message 
delivery or for example a process hanging for ever:
https://en.wikipedia.org/wiki/Snapshot_algorithm
https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

So my understanding (maybe wrong) is that this is a solution which seems not to 
address the fault tolerance issue in a strong manner like for example if it was 
to use a 3pc protocol for local state propagation and gl

Re: flink snapshotting fault-tolerance

2016-05-19 Thread Paris Carbone
Hi Abhishek,
I don’t see the problem there (also this is unrelated to the snapshotting 
protocol).
Intuitively, if you submit a copy of your state (full or delta) for a snapshot 
version/epoch to a store backend and validate the full snapshot for that 
version when you eventually receive the acknowledgements this still works fine. 
Am I missing something?

On 19 May 2016, at 20:36, Abhishek R. Singh 
mailto:abhis...@tetrationanalytics.com>> wrote:

I was wondering how checkpoints can be async? Because your state is constantly 
mutating. You probably need versioned state, or immutable data structs?

-Abhishek-

On May 19, 2016, at 11:14 AM, Paris Carbone 
mailto:par...@kth.se>> wrote:

Hi Stavros,

Currently, rollback failure recovery in Flink works in the pipeline level, not 
in the task level (see Millwheel [1]). It further builds on repayable stream 
logs (i.e. Kafka), thus, there is no need for 3pc or backup in the pipeline 
sources. You can also check this presentation [2] which explains the basic 
concepts more in detail I hope. Mind that many upcoming optimisation 
opportunities are going to be addressed in the not so long-term Flink roadmap.

Paris

[1] 
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
[2] 
http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
On 19 May 2016, at 19:43, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

Cool thnx. So if a checkpoint expires the pipeline will block or fail in total 
or only the specific task related to the operator (running along with the 
checkpoint task) or nothing happens?

On Tue, May 17, 2016 at 3:49 PM, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Hi Stravos,

I haven't implemented our checkpointing mechanism and I didn't participate in 
the design decisions while implementing it, so I can not compare it in detail 
to other approaches.

From a "does it work perspective": Checkpoints are only confirmed if all 
parallel subtasks successfully created a valid snapshot of the state. So if 
there is a failure in the checkpointing mechanism, no valid checkpoint will be 
created. The system will recover from the last valid checkpoint.
There is a timeout for checkpoints. So if a barrier doesn't pass through the 
system for a certain period of time, the checkpoint is cancelled. The default 
timeout is 10 minutes.

Regards,
Robert


On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:
Hi,

I was looking into the flink snapshotting algorithm details also mentioned here:
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/
http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html

From other sources i understand that it assumes no failures to work for message 
delivery or for example a process hanging for ever:
https://en.wikipedia.org/wiki/Snapshot_algorithm
https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

So my understanding (maybe wrong) is that this is a solution which seems not to 
address the fault tolerance issue in a strong manner like for example if it was 
to use a 3pc protocol for local state propagation and global agreement. I know 
the latter is not efficient just mentioning it for comparison.

How the algorithm behaves in practical terms under the presence of its own 
failures (this is a background process collecting partial states)? Are there 
timeouts for reaching a barrier?

PS. have not looked deep into the code details yet, planning to.

Best,
Stavros








Re: flink snapshotting fault-tolerance

2016-05-19 Thread Paris Carbone
Sure, in practice you can set a threshold of retries since an operator 
implementation could cause this indefinitely or any other reason can make 
snapshotting generally infeasible. If I recall correctly that threshold exists 
in the Flink configuration.

On 19 May 2016, at 20:42, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

The problem here is different though if something is keep failing (permanently) 
in practice someone needs to be notified. If the user loses snapshotting he 
must know.

On Thu, May 19, 2016 at 9:36 PM, Abhishek R. Singh 
mailto:abhis...@tetrationanalytics.com>> wrote:
I was wondering how checkpoints can be async? Because your state is constantly 
mutating. You probably need versioned state, or immutable data structs?

-Abhishek-

On May 19, 2016, at 11:14 AM, Paris Carbone 
mailto:par...@kth.se>> wrote:

Hi Stavros,

Currently, rollback failure recovery in Flink works in the pipeline level, not 
in the task level (see Millwheel [1]). It further builds on repayable stream 
logs (i.e. Kafka), thus, there is no need for 3pc or backup in the pipeline 
sources. You can also check this presentation [2] which explains the basic 
concepts more in detail I hope. Mind that many upcoming optimisation 
opportunities are going to be addressed in the not so long-term Flink roadmap.

Paris

[1] 
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
[2] 
http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>

<http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
On 19 May 2016, at 19:43, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:

Cool thnx. So if a checkpoint expires the pipeline will block or fail in total 
or only the specific task related to the operator (running along with the 
checkpoint task) or nothing happens?

On Tue, May 17, 2016 at 3:49 PM, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Hi Stravos,

I haven't implemented our checkpointing mechanism and I didn't participate in 
the design decisions while implementing it, so I can not compare it in detail 
to other approaches.

>From a "does it work perspective": Checkpoints are only confirmed if all 
>parallel subtasks successfully created a valid snapshot of the state. So if 
>there is a failure in the checkpointing mechanism, no valid checkpoint will be 
>created. The system will recover from the last valid checkpoint.
There is a timeout for checkpoints. So if a barrier doesn't pass through the 
system for a certain period of time, the checkpoint is cancelled. The default 
timeout is 10 minutes.

Regards,
Robert


On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos 
mailto:st.kontopou...@gmail.com>> wrote:
Hi,

I was looking into the flink snapshotting algorithm details also mentioned here:
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/
http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html

>From other sources i understand that it assumes no failures to work for 
>message delivery or for example a process hanging for ever:
https://en.wikipedia.org/wiki/Snapshot_algorithm
https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

So my understanding (maybe wrong) is that this is a solution which seems not to 
address the fault tolerance issue in a strong manner like for example if it was 
to use a 3pc protocol for local state propagation and global agreement. I know 
the latter is not efficient just mentioning it for comparison.

How the algorithm behaves in practical terms under the presence of its own 
failures (this is a background process collecting partial states)? Are there 
timeouts for reaching a barrier?

PS. have not looked deep into the code details yet, planning to.

Best,
Stavros









Re: sampling function

2016-07-12 Thread Paris Carbone
Hey Do,

I think that more sophisticated samplers could make a better fit in the ML 
library and not in the core API but I am not very familiar with the milestones 
there.
Maybe the maintainers of the batch ML library could check if sampling 
techniques could be useful there I guess.

Paris

> On 11 Jul 2016, at 16:15, Le Quoc Do  wrote:
> 
> Hi all,
> 
> Thank you all for your answers.
> By the way, I also recognized that Flink doesn't support  "stratified
> sampling" function (only simple random sampling) for DataSet.
> It would be nice if someone can create a Jira for it, and assign the task
> to me so that I can work for it.
> 
> Thank you,
> Do
> 
> On Mon, Jul 11, 2016 at 11:44 AM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
> 
>> Hi Do,
>> 
>> Paris and Martha worked on sampling techniques for data streams on Flink
>> last year. If you want to implement your own samplers, you might find
>> Martha's master thesis helpful [1].
>> 
>> -Vasia.
>> 
>> [1]: http://kth.diva-portal.org/smash/get/diva2:910695/FULLTEXT01.pdf
>> 
>> On 11 July 2016 at 11:31, Kostas Kloudas 
>> wrote:
>> 
>>> Hi Do,
>>> 
>>> In DataStream you can always implement your own
>>> sampling function, hopefully without too much effort.
>>> 
>>> Adding such functionality it to the API could be a good idea.
>>> But given that in sampling there is no “one-size-fits-all”
>>> solution (as not every use case needs random sampling and not
>>> all random samplers fit to all workloads), I am not sure if we
>>> should start adding different sampling operators.
>>> 
>>> Thanks,
>>> Kostas
>>> 
 On Jul 9, 2016, at 5:43 PM, Greg Hogan  wrote:
 
 Hi Do,
 
 DataSet provides a stable @Public interface. DataSetUtils is marked
 @PublicEvolving which is intended for public use, has stable behavior,
>>> but
 method signatures may change. It's also good to limit DataSet to common
 methods whereas the utility methods tend to be used for specific
 applications.
 
 I don't have the pulse of streaming but this sounds like a useful
>> feature
 that could be added.
 
 Greg
 
 On Sat, Jul 9, 2016 at 10:47 AM, Le Quoc Do 
>> wrote:
 
> Hi all,
> 
> I'm working on approximate computing using sampling techniques. I
> recognized that Flink supports the sample function for Dataset
> (org/apache/flink/api/java/utils/DataSetUtils.java). I'm just
>> wondering
>>> why
> you didn't merge the function to
>> org/apache/flink/api/java/DataSet.java
> since the sample function works as a transformation operator?
> 
> The second question is that are you planning to support the sample
> function for DataStream (within windows) since I did not see it in
> DataStream code ?
> 
> Thank you,
> Do
> 
>>> 
>>> 
>> 



Re: Data point goes missing within iteration

2016-07-20 Thread Paris Carbone
This is possibly related to the way the queue between StreamIterationTail and 
Head is currently implemented.
I think this part is a bit prone to records loss when things get wacky and 
backpressure kicks in (but at least it avoids deadlocks, right?).

I don’t have the time availability to look into the code right now but I am 
going to focus on a progress/loops by September, hopefully with a FLIP, which 
solves that part as well.

If  this is urgent, please go ahead and check this now, I think that queue 
timeouts cause this...

Paris

PS: on my yet incomplete PR (I know I know) I basically disabled queue polling 
timeouts since the checkpoint overhead on the StreamIterationHead almost always 
led to record loss.
https://github.com/apache/flink/pull/1668

On 20 Jul 2016, at 11:57, Maximilian Michels 
mailto:m...@apache.org>> wrote:

CC Gyula and Paris in case they might want to help out.

On Tue, Jul 19, 2016 at 11:43 AM, Biplob Biswas
mailto:revolutioni...@gmail.com>> wrote:
Hi Ufuk,

Thanks for the update, is there any known way to fix this issue? Any
workaround that you know of, which I can try?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8015.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.



Re: Evolution algorithm on flink

2016-10-13 Thread Paris Carbone
Very interesting! 
Thank you for sharing Andrew!

Paris

> On Oct 13, 2016, at 11:00 AM, Andrew Ge Wu  wrote:
> 
> Hi guys
> 
> I just published my code to maven central, open source ofc.
> I try to make this as generic as possible. 
> If you are interested, please try it out, and help me to improve this!
> https://github.com/CircuitWall/machine-learning
> 
> 
> 
> Thanks!
> 
> 
> Andrew


Re: Flink Material & Papers

2016-11-21 Thread Paris Carbone
+1 for the references

Imho the most relevant scientific publication related to the current 
state-of-the-art of Flink is the first one cited by Dominik (IEEE Bulletin). So 
it makes sense to cite that one.

However, Hanna, if you are also interested about prior work at TU Berlin that 
bootstrapped the Flink project check the papers on the Stratosphere system 
[1-4].
This explains and focuses on the data set (batch) processing programming model 
and optimisation framework that is accessible on Flink.

[1] 
http://barbie.uta.edu/~hdfeng/bigdata/Papers/The%20Stratosphere%20platform%20for%20big%20data%20analytics.pdf
[2] https://arxiv.org/pdf/1208.0088.pdf
[3] https://arxiv.org/pdf/1208.0087.pdf
[4] http://dl.acm.org/citation.cfm?id=2621940

Paris

On 21 Nov 2016, at 18:03, Dominik Safaric 
mailto:dominiksafa...@gmail.com>> wrote:

Hi Hanna,

I would certainly recommend if you haven’t so far to check the official docs of 
Flink at flink.apache.org. The documentation is 
comprehensive and understandable.

From that point, I would recommend the following blog posts and academic papers:


  *Apache Flink: Stream and Batch Processing in a Single Engine - 
http://sites.computer.org/debull/A15dec/p28.pdf
  *   Lightweight Asynchronous Snapshots for Distributed Dataflows - 
https://arxiv.org/pdf/1506.08603v1.pdf
  *   https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
  *   
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
  *   
https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html

In addition to, I would suggest you to read the Realtime Data Processing at 
Facebook paper describing some of the important characteristics of stream 
processing engines generally applicable to Flink as well.

Regards,
Dominik

On 21 Nov 2016, at 17:53, Hanna Prinz 
mailto:hanna_pr...@yahoo.de>> wrote:

Guten Abend everyone,

I’m currently writing a term paper about Flink at the HTW Berlin and I wanted 
to ask you if you can help with papers (or other material) about Flink. I could 
also come over to the TU if someone's doing a lecture about Flink.

And now that I’m writing you: I accidentally ran into this guide when I wanted 
to implement a demo for my presentation (which I know now is meant for 
development on the Flink Core): 
https://ci.apache.org/projects/flink/flink-docs-master/internals/ide_setup.html#intellij-idea
But anyway, I wanted to tell you that the Scala Compiler Plugin can’t be 
installed like instructed because there’s no „Install Jetbrains Plugin…“ in the 
Dialog (see screenshot attached).
I’m using IntelliJ IDEA 2016.2.5, Build #IU-162.2228.15, built on October 14, 
2016 on macOS 10.12.1.

Many thanks in advance!
Cheers
Hanna






Re: Consistency guarantees on multiple sinks

2017-01-05 Thread Paris Carbone
Hi Nancy,

Flink’s vanilla rollback recovery mechanism restarts computation from a global 
checkpoint thus sink duplicates (job output) can occur no matter how many sinks 
are declared;  the whole computation in the failed execution graph will roll 
back.

cheers
Paris


> On 5 Jan 2017, at 14:24, Nancy Estrada  wrote:
> 
> Hi,
> 
> If in a Job there is more than one sink declared, what happens when a
> failure occurs? all the sink operations get aborted? (atomically as in a
> transactional environment), or the exactly-once-processing consistency
> guarantees are provided just when one sink is declared per job? Is it
> recommended to have more than one sink per job?
> 
> Thank you!
> Nancy Estrada
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Consistency-guarantees-on-multiple-sinks-tp10877.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: A question about iterations and prioritizing "control" over "data" inputs

2017-03-23 Thread Paris Carbone
I think what Theo meant is to allow for different: high/low priority on 
different channels (or data streams per se) for n-ary operators such as 
ConnectedStream binary maps, loops etc.. not to change the sequence of events 
within channels I guess.

This does not violate the FIFO channel assumptions of the checkpointing 
algorithm. The checkpoint barriers anyway block committed stream partitions so 
there is no priority concern there.

On 23 Mar 2017, at 12:13, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:

To very quickly respond to Theo's question: No, it is not possible to have 
records overtake each other in the buffer.
This could potentially void the exactly once processing guarantees, in the case 
when records overtake checkpoint barriers.


On Wed, Mar 15, 2017 at 5:58 PM, Kathleen Sharp 
mailto:kathleen.sh...@signavio.com>> wrote:
Hi,

I have a similar sounding use case and just yesterday was
experimenting with this approach:

Use 2 separate streams: one for model events, one for data events.
Connect these 2, key the resulting stream and then use a
RichCoFlatMapFunction to ensure that each data event is enriched with
the latest model event as soon as a new model event arrives.
Also, as soon as a new model arrives emit all previously seen events
with this new model events.
This involves keeping events and models in state.
My emitted enriched events have a command-like syntax (add/remove) so
that downstream operators can remove/add as necessary depending on the
calculations (so for each model change I would emit an add/remove pair
of enriched events).

As I say I have only experimented with this yesterday, perhaps someone
a bit more experienced with flink might spot some problems with this
approach, which I would definitely be interested in hearing.

Kat

On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis
mailto:theodoros.vasilou...@gmail.com>> wrote:
> Hello all,
>
> I've started thinking about online learning in Flink and one of the issues
> that has come
> up in other frameworks is the ability to prioritize "control" over "data"
> events in iterations.
>
> To set an example, say we develop an ML model, that ingests events in
> parallel, performs
> an aggregation to update the model, and then broadcasts the updated model to
> back through
> an iteration/back edge. Using the above nomenclature the events being
> ingested would be
> "data" events, and the model update would a "control" event.
>
> I talked about this scenario a bit with couple of people (Paris and
> Gianmarco) and one thing
> we would like to have is the ability to prioritize the ingestion of control
> events over the data events.
>
> If my understanding is correct, currently there is a buffer/queue of events
> waiting to be processed
> for each operator, and each incoming event ends up at the end of that queue.
>
> If our data source is fast, and the model updates slow, a lot of data events
> might be buffered/scheduled
> to be processed before each model update, because of the speed difference
> between the two
> streams. But we would like to update the model that is used to process data
> events as soon as
> the newest version becomes available.
>
> Is it somehow possible to make the control events "jump" the queue and be
> processed as soon
> as they arrive over the data events?
>
> Regards,
> Theodore
>
> P.S. This is still very much a theoretical problem, I haven't looked at how
> such a pipeline would
> be implemented in Flink.




Re: A question about iterations and prioritizing "control" over "data" inputs

2017-03-23 Thread Paris Carbone
Unless I got this wrong, if he meant relaxing FIFO processing per 
channel/stream partition then Robert is absolutely right.

On 23 Mar 2017, at 12:28, Paris Carbone mailto:par...@kth.se>> 
wrote:

I think what Theo meant is to allow for different: high/low priority on 
different channels (or data streams per se) for n-ary operators such as 
ConnectedStream binary maps, loops etc.. not to change the sequence of events 
within channels I guess.

This does not violate the FIFO channel assumptions of the checkpointing 
algorithm. The checkpoint barriers anyway block committed stream partitions so 
there is no priority concern there.

On 23 Mar 2017, at 12:13, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:

To very quickly respond to Theo's question: No, it is not possible to have 
records overtake each other in the buffer.
This could potentially void the exactly once processing guarantees, in the case 
when records overtake checkpoint barriers.


On Wed, Mar 15, 2017 at 5:58 PM, Kathleen Sharp 
mailto:kathleen.sh...@signavio.com>> wrote:
Hi,

I have a similar sounding use case and just yesterday was
experimenting with this approach:

Use 2 separate streams: one for model events, one for data events.
Connect these 2, key the resulting stream and then use a
RichCoFlatMapFunction to ensure that each data event is enriched with
the latest model event as soon as a new model event arrives.
Also, as soon as a new model arrives emit all previously seen events
with this new model events.
This involves keeping events and models in state.
My emitted enriched events have a command-like syntax (add/remove) so
that downstream operators can remove/add as necessary depending on the
calculations (so for each model change I would emit an add/remove pair
of enriched events).

As I say I have only experimented with this yesterday, perhaps someone
a bit more experienced with flink might spot some problems with this
approach, which I would definitely be interested in hearing.

Kat

On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis
mailto:theodoros.vasilou...@gmail.com>> wrote:
> Hello all,
>
> I've started thinking about online learning in Flink and one of the issues
> that has come
> up in other frameworks is the ability to prioritize "control" over "data"
> events in iterations.
>
> To set an example, say we develop an ML model, that ingests events in
> parallel, performs
> an aggregation to update the model, and then broadcasts the updated model to
> back through
> an iteration/back edge. Using the above nomenclature the events being
> ingested would be
> "data" events, and the model update would a "control" event.
>
> I talked about this scenario a bit with couple of people (Paris and
> Gianmarco) and one thing
> we would like to have is the ability to prioritize the ingestion of control
> events over the data events.
>
> If my understanding is correct, currently there is a buffer/queue of events
> waiting to be processed
> for each operator, and each incoming event ends up at the end of that queue.
>
> If our data source is fast, and the model updates slow, a lot of data events
> might be buffered/scheduled
> to be processed before each model update, because of the speed difference
> between the two
> streams. But we would like to update the model that is used to process data
> events as soon as
> the newest version becomes available.
>
> Is it somehow possible to make the control events "jump" the queue and be
> processed as soon
> as they arrive over the data events?
>
> Regards,
> Theodore
>
> P.S. This is still very much a theoretical problem, I haven't looked at how
> such a pipeline would
> be implemented in Flink.





Re: Questions on GSoC project: Query optimisation layer for Flink Streaming

2015-03-20 Thread Paris Carbone
Hi Wepngong,

This is an interesting proposal. There are indeed many streaming optimisations 
out there but as Gyula said we should focus on a few and engineer them in a 
nice way. Perhaps for the time being it makes sense to focus on a streaming job 
graph optimiser that basically applies optimisations by statically analysing 
the graph before submitting it ie. query re-writing, operator reordering, 
operator sharing, intermediate result sharing etc. A runtime optimiser that can 
do things like load balancing and online reconfiguration would certainly be the 
next step.

cheers
Paris

On 20 Mar 2015, at 12:48, Gyula Fóra 
mailto:gyf...@apache.org>> wrote:

Hey,

Of course the aim of the project would not be to implement all possible 
optimizations because that would be impossible to do so in such short time :)

It would be nice if one could carefully select some optimizations that would 
make the most impact on the performance and implement those.

Regards,
Gyula

On Fri, Mar 20, 2015 at 5:15 AM, Wepngong Benaiah 
mailto:bwepng...@gmail.com>> wrote:
hello ,
I have been making some research on
https://issues.apache.org/jira/browse/FLINK-1617 using
http://hirzels.com/martin/papers/csur14-streamopt.pdf and others. I
find out that there are many optimization techniques available like
1. OPERATOR REORDERING
2. REDUNDANCY ELIMINATION
3. OPERATOR SEPARATION
4. FUSION
5. FISSION
6. LOAD BALANCING
7. STATE SHARING

My question is do I need to choose 1 or 2 of the algorithms and
implement for GSOC or im required to implement all the algorithms
given the tight contraint for GSOC timeline
Need help @Gyula Fora

--
Wepngong Ngeh Benaiah

"Black holes are where God divided by zero. "




Re: Superstep-like synchronization of streaming iteration

2018-10-01 Thread Paris Carbone
Hi Christian,

It is great to see use iterative use cases, thanks for sharing your problem!

Superstep iterative BSP synchronization for streams is a problem we have been 
looking into extensively, however this functionality is still not standardised 
yet on Flink.
I think your use case is fully covered by our proposed approach, described in a 
research talk at Flink Forward 18 in Berlin [1] (probably there is a video 
available too at the dataArtisans website). 
Take a look and in case this approach satisfies your needs and you would like 
to test out your application with our current prototype please do PM me!

Paris

[1] 
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2018-paris-carbone-stream-loops-on-flink-reinventing-the-wheel-for-the-streaming-era

> On 29 Sep 2018, at 20:51, Christian Lehner  
> wrote:
> 
> Hi all,
> 
> 
> if you don't want to read the wall of text below, in short, I want to know if 
> it is possible to get a superstep-based iteration on a possibly unbounded 
> DataStream in Flink in an efficient way and what general concept(s) of 
> synchronization you would suggest for that.
> 
> 
> I would like to write a program that has different vertices (realized just as 
> Longs for now) in a graph which all store a keyed state and communicate with 
> each other with messages that arrive in an iterated stream.
> 
> From the outside I would only get the messages to add (or, possibly in the 
> future, delete, however that can be ignored for now) a certain vertex with 
> some possible additional information specified by the program (this message 
> can be assumed to have the same form as any other message) and then the rest 
> would happen through an iterated stream keyed by the vertex to which the 
> message is adressed in which a vertex through a KeyedProcessFunction (or 
> KeyedBroadcastProcessFunction if a BroadcastStream is used for 
> synchronization) can send new messages to any other vertex (or itself) based 
> on the received message(s) and its own current state and can also update its 
> state based on the received message(s). The new messages would then be fed 
> back into the iterated stream. If no synchronization is done this works quite 
> well, however it doesn't produce helpful results for my problem since no 
> order in which the messages arrive can be guaranteed.
> 
> What I would optimally like to have is a pregel-like superstep-based 
> iteration which runs on a batch of outside messages (here: vertex additions) 
> until no more messages are produced and after that repeats that with the next 
> batch of vertices either infinitely or until there are no more new messages 
> received. During the execution of each batch all vertices (including older 
> ones) can be activated again by receiving a message and the state of each 
> vertex should be preserved throughout the execution of the program. The 
> problem lies in how I can seperate the messages into supersteps in an 
> iterative partitioned stream similar to the iterations in the DataSetAPI.
> 
> One idea I had was just making tumbling windows of a large enough amount of 
> time which would just collect all the messages and then emit them in a 
> ProcessWindowFunction once the window fires. While this would be quite a 
> simple solution that requires little non-parallel synchonization and it would 
> obviously require that we know such a time in which we can be guaranteed that 
> all messages have been processed and all new messages for the next superstep 
> produced which is realistically not the case. It would also mean that in most 
> supersteps the program would wait longer than necessary until it starts the 
> next superstep. Fault tolerance would also be very hard to achieve.
> 
> Another more complex idea was to just globally synchronize with an object 
> that remembers which vertices have been sent messages in the previous 
> superstep by being informed before any message is sent and then is also 
> informed when a vertex is done with processing a message and informs the 
> vertex if there globally are no more messages to be processed. If that is the 
> case the vertex then sends a NextSuperstep message which is broadcast to all 
> partitions with a BroadcastStream. After that all vertices can start with 
> processing all messages sent to them in the previous superstep. Other than 
> not being trivially to synchronize without any problems (which I'm working on 
> myself) this approach has the obvious disadvantage that a lot of information 
> has to be passed to this object in a globally synchronized manner which kind 
> of kills the point of parallel processing. Although it is obvious that some 
> global synchronization probably has to take place this approach seems rather 
> ineffective to m

Re: FLIP-16, FLIP-15 Status Updates?

2019-02-21 Thread Paris Carbone
I created these FLIPs a while back, sorry for being late to this discussion but 
I can try to elaborate.

The idea from FLIP-16 is proven to be correct [1] (see chapter 3) and I think 
it is the only way to go but I have been in favour of providing channel 
implementations with checkpoint behaviour built in.
I do have some pretty outdated Flink forks with these patches resolved but it 
is not an elegant implementation. If I am not mistaken people looked into 
checkpointing in-transit channel state in Blink and that is a good use case to 
(re)use that part of the logic imho.

Regarding FLIP-15 and beyond there are a few things we have to agree before 
stream iterations are finalised.
We have looked a LOT into this ([1] see chapter 6) also together with Vasia 
(CC) and discussed with other committers to allow for a BSP iterative model on 
windows and extensions to dynamic graphs, ML etc.
Some of these ideas were also planned to be integrated in Blink but I think 
this is still in the prototyping phase at Alibaba, please correct me if I am 
wrong.
I am very happy there is finally some interest in this! The FLIP discussions 
were dead for years. Maybe it is time to start planning it properly together, 
it is a super cool feature ;)

cheers
Paris

[1] http://kth.diva-portal.org/smash/get/diva2:1240814/FULLTEXT01.pdf



On 21 Feb 2019, at 19:17, Stephan Ewen 
mailto:se...@apache.org>> wrote:

Hi John!

I know some committers are working on iterations, but on a bigger update. That 
might subsume the FLIPs 15 and 16 eventually.
I believe they will share some part of that soon (in a few weeks).

Best,
Stephan


On Tue, Feb 19, 2019 at 5:45 PM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi Timo,

That’s great, thank you very much. If I’d like to contribute, is it best to 
wait until the roadmap has been published? And is this the best list to ask on, 
or is the development mailing list better?

Many thanks,

John

Sent from my iPhone

> On 19 Feb 2019, at 16:29, Timo Walther 
> mailto:twal...@apache.org>> wrote:
>
> Hi John,
>
> you are right that there was not much progress in the last years around these 
> two FLIPs. Mostly due to shift of priorities. However, with the big Blink 
> code contribution from Alibaba and joint development forces for a unified 
> batch and streaming runtime [1], it is very likely that also iterations and 
> thus machine learning algorithms will see more development efforts.
>
> The community is working on roadmap page for the website. And I can already 
> reveal that a new iterations model is mentioned there. The new Flink roadmap 
> page can be expected in the next 2-3 weeks.
>
> I hope this information helps.
>
> Regards,
> Timo
>
> [1] 
> https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html
>
>> Am 19.02.19 um 12:47 schrieb John Tipper:
>> Hi All,
>>
>> Does anyone know what the current status is for FLIP-16 (loop fault 
>> tolerance) and FLIP-15 (redesign iterations) please? I can see lots of work 
>> back in 2016, but it all seemed to stop and go quiet since about March 2017. 
>> I see iterations as offering very interesting capabilities for Flink, so it 
>> would be good to understand how we can get this moving again.
>>
>> Many thanks,
>>
>> John
>>
>> Sent from my iPhone
>
>



Re: Which test cluster to use for checkpointing tests?

2018-03-06 Thread Paris Carbone
Hey,

Indeed checkpointing iterations and dealing with closed sources are orthogonal 
issues, that is why the latter is not part of FLIP-15. Though, you kinda need 
both to have meaningful checkpoints for jobs with iterations.
One has to do with correctness (checkpointing strongly connected components in 
the execution graph) and the other about termination (terminating the 
checkpointing protocol when certain tasks ‘finish’).

I am willing to help out resolving the first issue, though I prefer to wait for 
ongoing changes in the network model and FLIP-6 to be finalised to apply this 
change properly (are they?). 

Paris

> On 6 Mar 2018, at 10:51, Nico Kruber  wrote:
> 
> Hi Ken,
> sorry, I was mislead by the fact that you are using iterations and those
> were only documented for the DataSet API.
> 
> Running checkpoints with closed sources sounds like a more general thing
> than being part of the iterations rework of FLIP-15. I couldn't dig up
> anything on jira regarding this improvement either.
> 
> @Stephan: is this documented somewhere?
> 
> 
> Nico
> 
> On 02/03/18 23:55, Ken Krugler wrote:
>> Hi Stephan,
>> 
>> Thanks for the update.
>> 
>> So is support for “running checkpoints with closed sources” part
>> of FLIP-15
>> ,
>> or something separate?
>> 
>> Regards,
>> 
>> — Ken
>> 
>>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen >> > wrote:
>>> 
>>> @Ken The issue you are running into is that Checkpointing works
>>> currently only until the job reaches the point where the pipeline
>>> starts to drain out, meaning when the sources are done. In your case,
>>> the source is done immediately, sending out only one tuple.
>>> 
>>> Running checkpoints with closed sources is something that's on the
>>> feature list and will come soon…
>> 
>> 
>> http://about.me/kkrugler
>> +1 530-210-6378
>> 
> 



Re: Iterative Stream won't loop

2018-05-11 Thread Paris Carbone
Hey!

I would recommend against using iterations with windows for that problem at the 
moment.
Alongside loop scoping and backpressure that will be addressed by FLIP-15 [1] I 
think you also need the notion of stream supersteps, which is experimental work 
in progress for now, from my side at least.

Until these features are added on Flink I would recommend trying out 
gelly-streams [2],  a Flink API for graph stream computation which supports 
connected components in a single pass.
All you need to do is to convert your stream into edge additions. You can try 
it and let us know what you think [2].

Paris

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132
[2] https://github.com/vasia/gelly-streaming

On 11 May 2018, at 10:08, Henrique Colao Zanuz 
mailto:henrique.co...@gmail.com>> wrote:

Hi,
thank you for your reply

Actually, I'm doing something very similar to your code. The problem I'm having 
is that this structure is not generating any loop. For instance, If I print 
labelsVerticesGroup, I only see the initial set of tuples, the one from 
updatedLabelsVerticesGroup (at the end of the first iteration) and nothing 
more. So, it means that the content of updatedLabelsVerticesGroup is indeed 
being correctly assigned to labelsVerticesGroup, but the loop itself is not 
happening.

For simplicity sake, here I'm omitting the logic behind the separation of the 
tuples that need to be fed back to the loop. I do understand that both codes we 
commented here are expected to loop indefinitely. On the complete version of 
mine, I use the JoinFunction, a ProcessAllWindowFunction, a Filter Function and 
a Map to create a flag that indicates if there was a change on the label of a 
vertex during the join function, then the ProcessAllWindowFunction to spread 
this flag to the whole window, in case any tuple had a change. Finally I filter 
the tuples by this flag. This whole mechanism is separating the tuples as 
expected. However, even if I remove this logic from the code, in order to get 
an infinite loop of the tuples (as we get on the code we've written in the 
previous emails), the iteration does not work.

PS. I've been using Flink 1.3.3

Best,
Henrique

Em sex, 11 de mai de 2018 às 00:01, Rong Rong 
mailto:walter...@gmail.com>> escreveu:
Based on the pseudo code. Seems like you are trying to do the loop by yourself 
and not suing the iterative.map() function[1].

I think you would need to specify the "map" function in order to use the 
iterative stream. and there should be a clear definition on
which data is iterative. In this case you have label & vertices interlacing 
each other but no specific loop back.

I would suggest something close to the example in [1], like
 labelsVerticesGroup = DataStream

 labels = labelsVerticesGroup.map(...)
  .keyBy(VertexID)
  .window(...)
  .min(label);

 vertices = labelsVerticesGroup.map(...)

 updatedLabelsVerticesGroup = 
vertices.join(labels).where(VertexId).equalTo(VertexId)
  .windowAll(...)
  .agg(...)

 labelsVerticesGroup.closeWith(updatedLabelsVerticesGroup)

Is this what you are looking for?

--
Rong

Reference:
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#iterations

On Thu, May 10, 2018 at 9:50 AM, Henrique Colao Zanuz 
mailto:henrique.co...@gmail.com>> wrote:
Hi,

I am trying to implement a connected components algorithm using DataStream. For 
this algorithm, I'm separating the data by tumbling windows. So, for each 
window, I'm trying to compute it independently.
This algorithm is iterative because the labels (colors) of the vertices need to 
be propagated. Basically, I need to iterate over the following steps:

Input: vertices = Datastream of 

Loop:
 labels = vertices.flatmap (emiting a tupple  for every  
vertices.f0 and every element on vertices.f1)
  .keyBy(VertexID)
  .window(...)
  .min(label);

 updatedVertices = vertices. join(labels).where(VertexId).equalTo(VertexId)
   .windowAll(...)
   .apply(re-emit original vertices stream 
tuples, but keeping the new labels)

End loop

I am trying to use IterativeStreams to do so. However, despite successfully 
separating the tuples that need to be fed back to the loop (by using filters 
and closeWith), the subsequent iterations are not happening. So, what I get is 
only the first iteration.
I suppose this might come from the fact that I'm creating a new stream (labels) 
based on the original IterativeStream, joining it with the original one 
(vertices) and only then closing the loop with it.
Do you know whether Flink has some limitation in this respect? and if so, would 
you have a hint about a different approach I could take for this algorithm to 
avoid this?

thank you in advance,
Henrique Colao