Re: Generate watermarks per key in a KeyedStream

2017-11-09 Thread Derek VerLee

  
  
We are contending with the same issue, as it happens.  We have
  dozens, and potentially down the line, may need to deal with
  thousands of different "time systems" as you put it, and may not
  be know at compile time or job start time.  In a practical sense,
  how could such a system be composed?  


On 11/9/17 5:52 AM, Shailesh Jain
  wrote:


  
Thanks for your reply, Xingcan.


  

  On Wed, Nov 8, 2017 at 10:42 PM,
Xingcan Cui 
wrote:

  
Hi
  Shailesh,


actually,
  the watermarks are generated per partition, but
  all of them will be forcibly aligned to the
  minimum one during processing. That is decided by
  the semantics of watermark and KeyedStream, i.e.,
  the watermarks belong to a whole stream and a
  stream is made up of different partitions (one per
  key).


If
  the physical devices work in different time
  systems due to delay, the event streams from them
  should be treated separately.


Hope
  that helps.


Best,
Xingcan
  
  

  
On Wed, Nov 8, 2017 at
  11:48 PM, Shailesh Jain 
  wrote:
  

  

  
Hi,
  

I'm working on implementing a use
case wherein different physical
devices are sending events, and due
to network/power issues, there can
be a delay in receiving events at
Flink source. One of the operators
within the flink job is the Pattern
operator, and there are certain
patterns which are time sensitive,
so I'm using Event time
characteristic. But the problem
comes when there are unpredictable
delays in events from a particular
device(s), which causes those events
to be dropped (as I cannot really
define a static bound to allow for
lateness).

  
  Since I'm using a KeyedStream, keyed
  on the source device ID, is there a
  way to allow each CEP operator
  instance (one per key) to progress its
  time based on the event time in the
  corresponding stream partition. Or in
  other words, is there a way to
  generate watermarks per partition in a
  KeyedStream?
  

Thanks,
  
  Shailesh

  


  

  

  
  

  

  


  



Re: Generate watermarks per key in a KeyedStream

2017-11-09 Thread Shailesh Jain
Thanks for your reply, Xingcan.

On Wed, Nov 8, 2017 at 10:42 PM, Xingcan Cui  wrote:

> Hi Shailesh,
>
> actually, the watermarks are generated per partition, but all of them will
> be forcibly aligned to the minimum one during processing. That is decided
> by the semantics of watermark and KeyedStream, i.e., the watermarks belong
> to a whole stream and a stream is made up of different partitions (one per
> key).
>
> If the physical devices work in different time systems due to delay, the
> event streams from them should be treated separately.
>
> Hope that helps.
>
> Best,
> Xingcan
>
> On Wed, Nov 8, 2017 at 11:48 PM, Shailesh Jain <
> shailesh.j...@stellapps.com> wrote:
>
>> Hi,
>>
>> I'm working on implementing a use case wherein different physical devices
>> are sending events, and due to network/power issues, there can be a delay
>> in receiving events at Flink source. One of the operators within the flink
>> job is the Pattern operator, and there are certain patterns which are time
>> sensitive, so I'm using Event time characteristic. But the problem comes
>> when there are unpredictable delays in events from a particular device(s),
>> which causes those events to be dropped (as I cannot really define a static
>> bound to allow for lateness).
>>
>> Since I'm using a KeyedStream, keyed on the source device ID, is there a
>> way to allow each CEP operator instance (one per key) to progress its time
>> based on the event time in the corresponding stream partition. Or in other
>> words, is there a way to generate watermarks per partition in a KeyedStream?
>>
>> Thanks,
>> Shailesh
>>
>
>


Re: Generate watermarks per key in a KeyedStream

2017-11-08 Thread Xingcan Cui
Hi Shailesh,

actually, the watermarks are generated per partition, but all of them will
be forcibly aligned to the minimum one during processing. That is decided
by the semantics of watermark and KeyedStream, i.e., the watermarks belong
to a whole stream and a stream is made up of different partitions (one per
key).

If the physical devices work in different time systems due to delay, the
event streams from them should be treated separately.

Hope that helps.

Best,
Xingcan

On Wed, Nov 8, 2017 at 11:48 PM, Shailesh Jain 
wrote:

> Hi,
>
> I'm working on implementing a use case wherein different physical devices
> are sending events, and due to network/power issues, there can be a delay
> in receiving events at Flink source. One of the operators within the flink
> job is the Pattern operator, and there are certain patterns which are time
> sensitive, so I'm using Event time characteristic. But the problem comes
> when there are unpredictable delays in events from a particular device(s),
> which causes those events to be dropped (as I cannot really define a static
> bound to allow for lateness).
>
> Since I'm using a KeyedStream, keyed on the source device ID, is there a
> way to allow each CEP operator instance (one per key) to progress its time
> based on the event time in the corresponding stream partition. Or in other
> words, is there a way to generate watermarks per partition in a KeyedStream?
>
> Thanks,
> Shailesh
>


Generate watermarks per key in a KeyedStream

2017-11-08 Thread Shailesh Jain
Hi,

I'm working on implementing a use case wherein different physical devices
are sending events, and due to network/power issues, there can be a delay
in receiving events at Flink source. One of the operators within the flink
job is the Pattern operator, and there are certain patterns which are time
sensitive, so I'm using Event time characteristic. But the problem comes
when there are unpredictable delays in events from a particular device(s),
which causes those events to be dropped (as I cannot really define a static
bound to allow for lateness).

Since I'm using a KeyedStream, keyed on the source device ID, is there a
way to allow each CEP operator instance (one per key) to progress its time
based on the event time in the corresponding stream partition. Or in other
words, is there a way to generate watermarks per partition in a KeyedStream?

Thanks,
Shailesh


Re: Watermarks per key

2017-02-20 Thread jganoff
There's nothing stopping me assigning timestamps and generating watermarks on
a keyed stream in the implementation and the KeyedStream API supports it. It
appears the underlying operator that gets created in
DataStream.assignTimestampsAndWatermarks() isn't key-aware and globally
tracks timestamps. So is that what's technically preventing assigning
timestamps per key from working?

I'm curious to hear Aljoscha's thoughts on watermark management across keys.

Thanks!



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Watermarks-per-key-tp11628p11761.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Watermarks per key

2017-02-15 Thread Fabian Hueske
Hi Jordan,

it is not possible to generate watermarks per key. This feature has been
requested a couple of times but I think there are no plans to implement
that.
As far as I understand, the management of watermarks would be quite
expensive (maintaining several watermarks, purging watermarks of expired
keys, etc.) but Aljoscha (in CC) can share details about that.

Best,
Fabian

2017-02-15 2:02 GMT+01:00 Jordan Ganoff <jor...@corvana.com>:

> Hi,
>
> I’m designing a streaming job whose elements need to be windowed by event
> time across a large set of keys. All elements are read from the same
> source. Event time progresses independently across keys. Is it possible to
> assign timestamps, and thus generate independent watermarks, per keyed
> stream, so late arriving elements can be handled per keyed stream?
>
> And in general, what’s the best approach to designing a job that needs to
> process different keyed streams whose event times do not relate to each
> other? My current approach generates timestamps at the source but never
> generates watermarks so no record is ever considered late. This has the
> unfortunate side effect of windows never closing. As a result, each event
> time window relies on a custom trigger which fires and purges the window
> after a given amount of processing time elapses during which no new records
> arrived.
>
> Thanks,
> Jordan


Watermarks per key

2017-02-14 Thread Jordan Ganoff
Hi,

I’m designing a streaming job whose elements need to be windowed by event time 
across a large set of keys. All elements are read from the same source. Event 
time progresses independently across keys. Is it possible to assign timestamps, 
and thus generate independent watermarks, per keyed stream, so late arriving 
elements can be handled per keyed stream?

And in general, what’s the best approach to designing a job that needs to 
process different keyed streams whose event times do not relate to each other? 
My current approach generates timestamps at the source but never generates 
watermarks so no record is ever considered late. This has the unfortunate side 
effect of windows never closing. As a result, each event time window relies on 
a custom trigger which fires and purges the window after a given amount of 
processing time elapses during which no new records arrived.

Thanks,
Jordan

Re: Maintaining watermarks per key, instead of per operator instance

2016-11-23 Thread Aljoscha Krettek
Tuple, TimeWindow> readingsPerHours = 
> readingsPerMinute
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2));
>
> DataStream aggregatesPerMinute = 
> readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> DataStream aggregatesPerHour = readingsPerHours.apply(new 
> ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> which gives me a compiler error as WindowedStream does not provide a
> timeWindow method.
>
> Finally I settled with this:
>
> KeyedStream<Reading, Tuple> readings = input
> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
> .keyBy("id");
>
> DataStream aggregatesPerMinute = readings
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.hours(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
> DataStream aggregatesPerHour = readings
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
>
>
> Feedback is very welcome.
>
> best, Stephan
>
>
>
> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http://user/SendEmail.jtp?type=node=10179=0>> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping < target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
>
> Hello,
>
> I found this question in the Nabble archive (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html)
> but was unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
> Hello,
>
> I have similar requirements (see StackOverflor
> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data).
> I am pretty new to flink, could you elaborate on a possible solution? We
> can guarantee good ordering by sensor_id, thus watermarking by key would be
> the only reasonable way for us (
> *sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do
> my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*...
> per key? Or maybe using custom state plus a custom trigger? What happens if
> a sensor dies or is being removed completely, how can this be detected as
> watermarks would be ignored for window garbage collection. Or could we
> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>
>
> Thanks,
> Stephan
>
>
> If you reply to this email, your message will be added to the discussion
> below:
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
>
> To unsubscribe from Maintaining watermarks per key, instead of per
> operator instance, click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
> --
> View this message in context: Re: Maintaining watermarks per key, instead
> of per operator instance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com <http://nabble.com/>.
>
> If you reply to this email, your message will be added to the discussion
> below:
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html
>
> To unsubscribe from Maintaining watermarks per key, instead of per
> operator instance, click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
> --
> View this message in context: Re: Maintaining watermarks per key, instead
> of per operator instance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10295.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>


Re: Maintaining watermarks per key, instead of per operator instance

2016-11-23 Thread kaelumania
new ReadingAggregate(), new AggregateReadings(), new 
>> AggregateReadings());
>> 
>> DataStream aggregatesPerHour = readings
>> .timeWindow(Time.hours(1))
>> .allowedLateness(Time.hours(2))
>> .apply(new ReadingAggregate(), new AggregateReadings(), new 
>> AggregateReadings());
>> 
>> 
>> Feedback is very welcome.
>> 
>> best, Stephan
>> 
>> 
>> 
>> 
>>> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing 
>>> List archive.] <[hidden email] 
>>> <http://user/SendEmail.jtp?type=node=10179=0>> wrote:
>>> 
>> 
>>> Hi Stephan,
>>> 
>>> I just wrote an answer to your SO question. 
>>> 
>>> Best, Fabian
>> 
>>> 
>>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <>> href="x-msg://3/user/SendEmail.jtp?type=nodenode=10033i=0 <>" 
>>> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>> 
>>> 
>>> Hello,
>>> 
>>> I found this question in the Nabble archive 
>>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>>>  but was unable/dont know how to reply.
>>> 
>>> Here is my question regarding the mentioned thread:
>>> 
>>>> Hello, 
>>>> 
>>>> I have similar requirements (see StackOverflor 
>>>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
>>>>  
>>>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
>>>>  I am pretty new to flink, could you elaborate on a possible solution? We 
>>>> can guarantee good ordering by sensor_id, thus watermarking by key would 
>>>> be the only reasonable way for us 
>>>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my 
>>>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... 
>>>> per key? Or maybe using custom state plus a custom trigger? What happens 
>>>> if a sensor dies or is being removed completely, how can this be detected 
>>>> as watermarks would be ignored for window garbage collection. Or could we 
>>>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>>> 
>>> 
>>> Thanks,
>>> Stephan
>>> 
>>> 
>> 
>>> If you reply to this email, your message will be added to the discussion 
>>> below:
>> 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html>
>>> To unsubscribe from Maintaining watermarks per key, instead of per operator 
>>> instance, click here <>.
>>> NAML 
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>> View this message in context: Re: Maintaining watermarks per key, instead of 
>> per operator instance 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at 
>> Nabble.com <http://nabble.com/>.
> 
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html>
> To unsubscribe from Maintaining watermarks per key, instead of per operator 
> instance, click here 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=7288=c3RlcGhhbi5lcHBpbmdAendlaXRhZy5kZXw3Mjg4fC0yNzYyODY4NzI=>.
> NAML 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10295.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Maintaining watermarks per key, instead of per operator instance

2016-11-23 Thread Aljoscha Krettek
You can implement discarding behaviour by writing a custom trigger (based
on EventTimeTrigger) that returns FIRE_AND_PURGE when firing. With this you
could maybe implement a cascade of windows where the first aggregates for
the smallest time interval and is discarding and where the other triggers
take these "pre-aggregated" values and accumulate.

On Tue, 22 Nov 2016 at 08:11 Stephan Epping <stephan.epp...@zweitag.de>
wrote:

> Hey Aljoscha,
>
> the first solution did not work out as expected. As when late elements
> arrive the first window is triggered again and would emit a new
> (accumulated) event, that would be counted twice (in time accumulation and
> late accumulation) in the second window.I could implement my own
> (discarding strategy) like in Apache Beam, but the out stream should
> contain accumulated events that are stored in cassandra. The second
> solution just gave an compiler error, thus I think is not possible right
> now.
>
> best Stephan
>
>
>
> On 21 Nov 2016, at 17:56, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> Hi,
> why did you settle for the last solution?
>
> Cheers,
> Aljoscha
>
> On Thu, 17 Nov 2016 at 15:57 kaelumania <stephan.epp...@zweitag.de> wrote:
>
> Hi Fabian,
>
> your proposed solution for:
>
>
>1. Multiple window aggregations
>
> You can construct a data flow of cascading window operators and fork off
> (to emit or further processing) the result after each window.
>
> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
> \-> out_1\-> out_2 \-> out_3
>
> does not work, am I missing something?
>
> First I tried the following
>
> DataStream values = input.assignTimestampsAndWatermarks(new 
> StrictWatermarkAssigner()); // force lateness
>
> DataStream aggregatesPerMinute = values
> .keyBy("id")
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.minutes(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
> DataStream aggregatesPerHour = aggregatesPerMinute
> .keyBy("id")
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2))
> .apply(new AggregateReadingAggregates(), new 
> AggregateReadingAggregates());
>
> but due to late data the first fold function would emit 2 rolling
> aggregates (one with and one without the late element), which results in
> being counted twice within the second reducer. Therefore i tried
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
> .keyBy("id")
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.hours(2));
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = 
> readingsPerMinute
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2));
>
> DataStream aggregatesPerMinute = 
> readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> DataStream aggregatesPerHour = readingsPerHours.apply(new 
> ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> which gives me a compiler error as WindowedStream does not provide a
> timeWindow method.
>
> Finally I settled with this:
>
> KeyedStream<Reading, Tuple> readings = input
> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
> .keyBy("id");
>
> DataStream aggregatesPerMinute = readings
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.hours(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
> DataStream aggregatesPerHour = readings
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
>
>
> Feedback is very welcome.
>
> best, Stephan
>
>
>
> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http://user/SendEmail.jtp?type=node=10179=0>> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping < target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
>
> Hello,
>
> I found this question in the Nabble archive (
> http://apache-flink-user-mailing-list-

Re: Maintaining watermarks per key, instead of per operator instance

2016-11-21 Thread Stephan Epping
Hey Aljoscha,

the first solution did not work out as expected. As when late elements arrive 
the first window is triggered again and would emit a new (accumulated) event, 
that would be counted twice (in time accumulation and late accumulation) in the 
second window.I could implement my own (discarding strategy) like in Apache 
Beam, but the out stream should contain accumulated events that are stored in 
cassandra. The second solution just gave an compiler error, thus I think is not 
possible right now.

best Stephan



> On 21 Nov 2016, at 17:56, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi,
> why did you settle for the last solution?
> 
> Cheers,
> Aljoscha
> 
> On Thu, 17 Nov 2016 at 15:57 kaelumania <stephan.epp...@zweitag.de 
> <mailto:stephan.epp...@zweitag.de>> wrote:
> Hi Fabian,
> 
> your proposed solution for:
>  
> Multiple window aggregations
> You can construct a data flow of cascading window operators and fork off (to 
> emit or further processing) the result after each window.
> 
> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
> \-> out_1\-> out_2 \-> out_3
> does not work, am I missing something?
> 
> First I tried the following
> DataStream values = input.assignTimestampsAndWatermarks(new 
> StrictWatermarkAssigner()); // force lateness
> 
> DataStream aggregatesPerMinute = values
> .keyBy("id")
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.minutes(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> 
> DataStream aggregatesPerHour = aggregatesPerMinute
> .keyBy("id")
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2))
> .apply(new AggregateReadingAggregates(), new 
> AggregateReadingAggregates());
> but due to late data the first fold function would emit 2 rolling aggregates 
> (one with and one without the late element), which results in being counted 
> twice within the second reducer. Therefore i tried
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
> .keyBy("id")
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.hours(2));
> 
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = 
> readingsPerMinute
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2));
> 
> DataStream aggregatesPerMinute = 
> readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> DataStream aggregatesPerHour = readingsPerHours.apply(new 
> ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
> which gives me a compiler error as WindowedStream does not provide a 
> timeWindow method.
> 
> Finally I settled with this:
> KeyedStream<Reading, Tuple> readings = input
> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
> .keyBy("id");
> 
> DataStream aggregatesPerMinute = readings
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.hours(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> 
> DataStream aggregatesPerHour = readings
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> 
> 
> Feedback is very welcome.
> 
> best, Stephan
> 
> 
> 
> 
>> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing 
>> List archive.] <[hidden email] 
>> <http://user/SendEmail.jtp?type=node=10179=0>> wrote:
>> 
> 
>> Hi Stephan,
>> 
>> I just wrote an answer to your SO question. 
>> 
>> Best, Fabian
> 
>> 
>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <> href="x-msg://3/user/SendEmail.jtp?type=nodenode=10033i=0" 
>> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
> 
>> 
>> Hello,
>> 
>> I found this question in the Nabble archive 
>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>>  
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>>  but was unable/dont know how to repl

Re: Maintaining watermarks per key, instead of per operator instance

2016-11-21 Thread Aljoscha Krettek
Hi,
why did you settle for the last solution?

Cheers,
Aljoscha

On Thu, 17 Nov 2016 at 15:57 kaelumania <stephan.epp...@zweitag.de> wrote:

> Hi Fabian,
>
> your proposed solution for:
>
>
>1. Multiple window aggregations
>
> You can construct a data flow of cascading window operators and fork off
> (to emit or further processing) the result after each window.
>
> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
> \-> out_1\-> out_2 \-> out_3
>
> does not work, am I missing something?
>
> First I tried the following
>
> DataStream values = input.assignTimestampsAndWatermarks(new 
> StrictWatermarkAssigner()); // force lateness
>
> DataStream aggregatesPerMinute = values
> .keyBy("id")
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.minutes(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
> DataStream aggregatesPerHour = aggregatesPerMinute
> .keyBy("id")
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2))
> .apply(new AggregateReadingAggregates(), new 
> AggregateReadingAggregates());
>
> but due to late data the first fold function would emit 2 rolling
> aggregates (one with and one without the late element), which results in
> being counted twice within the second reducer. Therefore i tried
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
> .keyBy("id")
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.hours(2));
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = 
> readingsPerMinute
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2));
>
> DataStream aggregatesPerMinute = 
> readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> DataStream aggregatesPerHour = readingsPerHours.apply(new 
> ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> which gives me a compiler error as WindowedStream does not provide a
> timeWindow method.
>
> Finally I settled with this:
>
> KeyedStream<Reading, Tuple> readings = input
> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
> .keyBy("id");
>
> DataStream aggregatesPerMinute = readings
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.hours(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
> DataStream aggregatesPerHour = readings
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
>
>
> Feedback is very welcome.
>
> best, Stephan
>
>
>
> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node=10179=0>> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping < href="x-msg://3/user/SendEmail.jtp?type=nodenode=10033i=0"
> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
>
> Hello,
>
> I found this question in the Nabble archive (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html)
> but was unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
> Hello,
>
> I have similar requirements (see StackOverflor
> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data).
> I am pretty new to flink, could you elaborate on a possible solution? We
> can guarantee good ordering by sensor_id, thus watermarking by key would be
> the only reasonable way for us (
> *sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do
> my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*...
> per key? Or maybe using custom state plus a custom trigger? What happens if
> a sensor dies or is being removed completely, how can this be detected as
> watermarks would be ignored for window garbage collection. Or could we
> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>
&

Re: Maintaining watermarks per key, instead of per operator instance

2016-11-17 Thread kaelumania
Hi Fabian,

your proposed solution for:
 
Multiple window aggregations
You can construct a data flow of cascading window operators and fork off (to 
emit or further processing) the result after each window.

Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
\-> out_1\-> out_2 \-> out_3
does not work, am I missing something?

First I tried the following
DataStream values = input.assignTimestampsAndWatermarks(new 
StrictWatermarkAssigner()); // force lateness

DataStream aggregatesPerMinute = values
.keyBy("id")
.timeWindow(Time.minutes(1))
.allowedLateness(Time.minutes(2))
.apply(new ReadingAggregate(), new AggregateReadings(), new 
AggregateReadings());

DataStream aggregatesPerHour = aggregatesPerMinute
.keyBy("id")
.timeWindow(Time.hours(1))
.allowedLateness(Time.hours(2))
.apply(new AggregateReadingAggregates(), new 
AggregateReadingAggregates());
but due to late data the first fold function would emit 2 rolling aggregates 
(one with and one without the late element), which results in being counted 
twice within the second reducer. Therefore i tried
WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force 
lateness
.keyBy("id")
.timeWindow(Time.minutes(1))
.allowedLateness(Time.hours(2));

WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute
.timeWindow(Time.hours(1))
.allowedLateness(Time.hours(2));

DataStream aggregatesPerMinute = readingsPerMinute.apply(new 
ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
DataStream aggregatesPerHour = readingsPerHours.apply(new 
ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
which gives me a compiler error as WindowedStream does not provide a timeWindow 
method.

Finally I settled with this:
KeyedStream<Reading, Tuple> readings = input
.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force 
lateness
.keyBy("id");

DataStream aggregatesPerMinute = readings
.timeWindow(Time.minutes(1))
.allowedLateness(Time.hours(2))
.apply(new ReadingAggregate(), new AggregateReadings(), new 
AggregateReadings());

DataStream aggregatesPerHour = readings
.timeWindow(Time.hours(1))
.allowedLateness(Time.hours(2))
.apply(new ReadingAggregate(), new AggregateReadings(), new 
AggregateReadings());


Feedback is very welcome.

best, Stephan



> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing List 
> archive.] <ml-node+s2336050n10033...@n4.nabble.com> wrote:
> 
> Hi Stephan,
> 
> I just wrote an answer to your SO question. 
> 
> Best, Fabian
> 
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <[hidden email] 
> >:
> Hello,
> 
> I found this question in the Nabble archive 
> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>  but was unable/dont know how to reply.
> 
> Here is my question regarding the mentioned thread:
> 
>> Hello, 
>> 
>> I have similar requirements (see StackOverflor 
>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
>>  
>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
>>  I am pretty new to flink, could you elaborate on a possible solution? We 
>> can guarantee good ordering by sensor_id, thus watermarking by key would be 
>> the only reasonable way for us 
>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my 
>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per 
>> key? Or maybe using custom state plus a custom trigger? What happens if a 
>> sensor dies or is being removed completely, how can this be detected as 
>> watermarks would be ignored for window garbage collection. Or could we 
>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
> 
> 
> Thanks,
> Stephan
> 
> 
> 
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining

Re: Maintaining watermarks per key, instead of per operator instance

2016-11-15 Thread Stephan Epping
e for the time window due to processing time)
>> - then update the aggregate and put it into a cassandra sink again
>> 
>> The cassandra select will be a bit slower than using an in memory/flink 
>> state, but will be cheaper in the end. Further, what does this have for 
>> consequences?
>> For example, replaying events will be more difficult, right? Also, what 
>> about Snapshots? Will they work with the mentioned design?
>> 
>> kind regards,
>> Stephan
> 
>>> On 11 Nov 2016, at 00:39, Fabian Hueske <>> href="x-msg://10/user/SendEmail.jtp?type=nodenode=10094i=1 
>>> " 
>>> target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
>>> 
> 
>>> Hi Stephan,
>>> 
>>> I just wrote an answer to your SO question. 
>>> 
>>> Best, Fabian
> 
>>> 
>>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <>> href="x-msg://10/user/SendEmail.jtp?type=nodenode=10094i=2 
>>> " 
>>> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
> 
>>> 
>>> Hello,
>>> 
>>> I found this question in the Nabble archive 
>>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>>>  but was unable/dont know how to reply.
>>> 
>>> Here is my question regarding the mentioned thread:
>>> 
>>>> Hello, 
>>>> 
>>>> I have similar requirements (see StackOverflor 
>>>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
>>>>  
>>>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
>>>>  I am pretty new to flink, could you elaborate on a possible solution? We 
>>>> can guarantee good ordering by sensor_id, thus watermarking by key would 
>>>> be the only reasonable way for us 
>>>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my 
>>>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... 
>>>> per key? Or maybe using custom state plus a custom trigger? What happens 
>>>> if a sensor dies or is being removed completely, how can this be detected 
>>>> as watermarks would be ignored for window garbage collection. Or could we 
>>>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>>> 
>>> 
>>> Thanks,
>>> Stephan
>>> 
>>> 
> 
>>> 
>> 
>> 
>> 
>> 
>> If you reply to this email, your message will be added to the discussion 
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html
>>  
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html>
>> To unsubscribe from Maintaining watermarks per key, instead of per operator 
>> instance, click here <>.
>> NAML 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
> 
> View this message in context: Re: Maintaining watermarks per key, instead of 
> per operator instance 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10098.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at 
> Nabble.com <http://nabble.com/>.



Re: Maintaining watermarks per key, instead of per operator instance

2016-11-14 Thread Aljoscha Krettek
Hi Stephan,
I was going to suggest that using a flatMap and tracking the timestamp of
each key yourself is a bit like having a per-key watermark. I wanted to
wait a bit before answering because I'm currently working on a new type of
Function that will be release with Flink 1.2: ProcessFunction. This is
somewhat like a FlatMap but also allows to access the element timestamp,
query current processing time/event time and set (per key) timers for
processing time and event time. With this you should be able to easily
implement your per-key tracking, I hope.

Cheers,
Aljoscha

P.S. ProcessFunction is already in the Flink repository but it's called
TimelyFlatMapFunction right now, because I was working on it under that
working title.

On Mon, 14 Nov 2016 at 15:47 kaelumania <stephan.epp...@zweitag.de> wrote:

> Hey Fabian,
>
> thank you very much.
>
> - yes, I would window by event time and fire/purge by processing time
> - Cheaper in the end meant, that having too much state in the flink
> cluster would be more expensive, as we store all data in cassandra too.I
> think the fault tolerance would be okay, as we would make a compare and set
> with cassandra.
>
> With the flatMap Operator wouldn’t it be like running my own windowing
> mechanism? I need to keep the aggregate window per sensor open (with
> checkpointing and state management) until I receive an element for a sensor
> that is later in time than the windows time and then purge the state and
> emit a new event (which is like having a watermark per sensor). Further, I
> need a timer that fires like after 24 hours, in case a sensor dies and
> doesn’t send more data which might is possible with window
> assigner/trigger, right? But not inside normal functions, e.g. flatMap? We
> can guarantee that all sensor data per sensor comes almost in order (might
> be out of order within a few seconds), but there might be gaps of several
> hours after network partitions.
>
> There is now way to define/redefine the watermark per keyed stream? Or
> adjust the window assigner + trigger to achieve the desired behaviour? I am
> a bit reserved in implementing the whole state management. Do you plan to
> support such use cases on keyed streams? Maybe the WatermarkAssigner could
> also receive information about the key for wich the watermark should be
> calculated etc.
>
> best, Stephan
>
>
> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node=10098=0>> wrote:
>
> Hi Stephan,
>
> I'm skeptical about two things:
> - using processing time will result in inaccurately bounded aggregates (or
> do you want to group by event time in a processing time window?)
> - writing to and reading from Cassandra might be expensive (not sure what
> you mean by cheaper in the end) and it is not integrated with Flink's
> checkpointing mechanism for fault-tolerance.
>
> To me, the stateful FlatMapOperator looks like the best approach. There is
> an upcoming feature for registering timers in user-functions, i.e., a
> function is called after the timer exceeds. This could be helpful to
> overcome the problem of closing the window without new data.
>
> Best,
> Fabian
>
>
> 2016-11-14 8:39 GMT+01:00 Stephan Epping < href="x-msg://10/user/SendEmail.jtp?type=nodenode=10094i=0"
> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
> Hello Fabian,
>
> Thank you very much. What is your opinion on the following solution:
>
> - Window data per time window, e.g. 15 minutes
> - using processing time as trigger, e.g. 15 minutes
> - which results in an aggregate over sensor values
> - then use cassandra to select the previous aggregate (as there can be
> multiple for the time window due to processing time)
> - then update the aggregate and put it into a cassandra sink again
>
> The cassandra select will be a bit slower than using an in memory/flink
> state, but will be cheaper in the end. Further, what does this have for
> consequences?
> For example, replaying events will be more difficult, right? Also, what
> about Snapshots? Will they work with the mentioned design?
>
> kind regards,
> Stephan
>
> On 11 Nov 2016, at 00:39, Fabian Hueske < href="x-msg://10/user/SendEmail.jtp?type=nodenode=10094i=1"
> target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping < href="x-msg://10/user/SendEmail.jtp?type=nodenode=10094i=2"
> target="_top"

Re: Maintaining watermarks per key, instead of per operator instance

2016-11-14 Thread kaelumania
Hey Fabian,

thank you very much. 

- yes, I would window by event time and fire/purge by processing time
- Cheaper in the end meant, that having too much state in the flink cluster 
would be more expensive, as we store all data in cassandra too.I think the 
fault tolerance would be okay, as we would make a compare and set with 
cassandra. 

With the flatMap Operator wouldn’t it be like running my own windowing 
mechanism? I need to keep the aggregate window per sensor open (with 
checkpointing and state management) until I receive an element for a sensor 
that is later in time than the windows time and then purge the state and emit a 
new event (which is like having a watermark per sensor). Further, I need a 
timer that fires like after 24 hours, in case a sensor dies and doesn’t send 
more data which might is possible with window assigner/trigger, right? But not 
inside normal functions, e.g. flatMap? We can guarantee that all sensor data 
per sensor comes almost in order (might be out of order within a few seconds), 
but there might be gaps of several hours after network partitions.

There is now way to define/redefine the watermark per keyed stream? Or adjust 
the window assigner + trigger to achieve the desired behaviour? I am a bit 
reserved in implementing the whole state management. Do you plan to support 
such use cases on keyed streams? Maybe the WatermarkAssigner could also receive 
information about the key for wich the watermark should be calculated etc.

best, Stephan


> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing List 
> archive.] <ml-node+s2336050n10094...@n4.nabble.com> wrote:
> 
> Hi Stephan,
> 
> I'm skeptical about two things: 
> - using processing time will result in inaccurately bounded aggregates (or do 
> you want to group by event time in a processing time window?)
> - writing to and reading from Cassandra might be expensive (not sure what you 
> mean by cheaper in the end) and it is not integrated with Flink's 
> checkpointing mechanism for fault-tolerance.
> 
> To me, the stateful FlatMapOperator looks like the best approach. There is an 
> upcoming feature for registering timers in user-functions, i.e., a function 
> is called after the timer exceeds. This could be helpful to overcome the 
> problem of closing the window without new data.
> 
> Best, 
> Fabian
> 
> 2016-11-14 8:39 GMT+01:00 Stephan Epping <[hidden email] 
> >:
> Hello Fabian,
> 
> Thank you very much. What is your opinion on the following solution:
> 
> - Window data per time window, e.g. 15 minutes
> - using processing time as trigger, e.g. 15 minutes
> - which results in an aggregate over sensor values
> - then use cassandra to select the previous aggregate (as there can be 
> multiple for the time window due to processing time)
> - then update the aggregate and put it into a cassandra sink again
> 
> The cassandra select will be a bit slower than using an in memory/flink 
> state, but will be cheaper in the end. Further, what does this have for 
> consequences?
> For example, replaying events will be more difficult, right? Also, what about 
> Snapshots? Will they work with the mentioned design?
> 
> kind regards,
> Stephan
> 
> 
>> On 11 Nov 2016, at 00:39, Fabian Hueske <[hidden email] 
>> > wrote:
>> 
>> Hi Stephan,
>> 
>> I just wrote an answer to your SO question. 
>> 
>> Best, Fabian
>> 
>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <[hidden email] 
>> >:
>> Hello,
>> 
>> I found this question in the Nabble archive 
>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>>  
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>>  but was unable/dont know how to reply.
>> 
>> Here is my question regarding the mentioned thread:
>> 
>>> Hello, 
>>> 
>>> I have similar requirements (see StackOverflor 
>>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
>>>  
>>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
>>>  I am pretty new to flink, could you elaborate on a possible solution? We 
>>> can guarantee good ordering by sensor_id, thus watermarking by key would be 
>>> the only reasonable way for us 
>>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my 
>>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per 
>>> key? Or maybe using

Re: Maintaining watermarks per key, instead of per operator instance

2016-11-14 Thread Fabian Hueske
Hi Stephan,

I'm skeptical about two things:
- using processing time will result in inaccurately bounded aggregates (or
do you want to group by event time in a processing time window?)
- writing to and reading from Cassandra might be expensive (not sure what
you mean by cheaper in the end) and it is not integrated with Flink's
checkpointing mechanism for fault-tolerance.

To me, the stateful FlatMapOperator looks like the best approach. There is
an upcoming feature for registering timers in user-functions, i.e., a
function is called after the timer exceeds. This could be helpful to
overcome the problem of closing the window without new data.

Best,
Fabian

2016-11-14 8:39 GMT+01:00 Stephan Epping <stephan.epp...@zweitag.de>:

> Hello Fabian,
>
> Thank you very much. What is your opinion on the following solution:
>
> - Window data per time window, e.g. 15 minutes
> - using processing time as trigger, e.g. 15 minutes
> - which results in an aggregate over sensor values
> - then use cassandra to select the previous aggregate (as there can be
> multiple for the time window due to processing time)
> - then update the aggregate and put it into a cassandra sink again
>
> The cassandra select will be a bit slower than using an in memory/flink
> state, but will be cheaper in the end. Further, what does this have for
> consequences?
> For example, replaying events will be more difficult, right? Also, what
> about Snapshots? Will they work with the mentioned design?
>
> kind regards,
> Stephan
>
>
> On 11 Nov 2016, at 00:39, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <stephan.epp...@zweitag.de>:
>
>> Hello,
>>
>> I found this question in the Nabble archive (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Maintaining-watermarks-per-key-instead-of-per-
>> operator-instance-tp7288.html) but was unable/dont know how to reply.
>>
>> Here is my question regarding the mentioned thread:
>>
>> Hello,
>>
>> I have similar requirements (see StackOverflor http://stac
>> koverflow.com/questions/40465335/apache-flink-multiple-
>> window-aggregations-and-late-data). I am pretty new to flink, could you
>> elaborate on a possible solution? We can guarantee good ordering by
>> sensor_id, thus watermarking by key would be the only reasonable way for us
>> (*sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do
>> my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*...
>> per key? Or maybe using custom state plus a custom trigger? What happens if
>> a sensor dies or is being removed completely, how can this be detected as
>> watermarks would be ignored for window garbage collection. Or could we
>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>>
>>
>> Thanks,
>> Stephan
>>
>>
>>
>
>


Re: Maintaining watermarks per key, instead of per operator instance

2016-11-13 Thread Stephan Epping
Hello Fabian,

Thank you very much. What is your opinion on the following solution:

- Window data per time window, e.g. 15 minutes
- using processing time as trigger, e.g. 15 minutes
- which results in an aggregate over sensor values
- then use cassandra to select the previous aggregate (as there can be multiple 
for the time window due to processing time)
- then update the aggregate and put it into a cassandra sink again

The cassandra select will be a bit slower than using an in memory/flink state, 
but will be cheaper in the end. Further, what does this have for consequences?
For example, replaying events will be more difficult, right? Also, what about 
Snapshots? Will they work with the mentioned design?

kind regards,
Stephan


> On 11 Nov 2016, at 00:39, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Stephan,
> 
> I just wrote an answer to your SO question. 
> 
> Best, Fabian
> 
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <stephan.epp...@zweitag.de 
> <mailto:stephan.epp...@zweitag.de>>:
> Hello,
> 
> I found this question in the Nabble archive 
> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>  but was unable/dont know how to reply.
> 
> Here is my question regarding the mentioned thread:
> 
>> Hello, 
>> 
>> I have similar requirements (see StackOverflor 
>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
>>  
>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
>>  I am pretty new to flink, could you elaborate on a possible solution? We 
>> can guarantee good ordering by sensor_id, thus watermarking by key would be 
>> the only reasonable way for us 
>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my 
>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per 
>> key? Or maybe using custom state plus a custom trigger? What happens if a 
>> sensor dies or is being removed completely, how can this be detected as 
>> watermarks would be ignored for window garbage collection. Or could we 
>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
> 
> 
> Thanks,
> Stephan
> 
> 
> 



Re: Maintaining watermarks per key, instead of per operator instance

2016-11-10 Thread Fabian Hueske
Hi Stephan,

I just wrote an answer to your SO question.

Best, Fabian

2016-11-10 11:01 GMT+01:00 Stephan Epping <stephan.epp...@zweitag.de>:

> Hello,
>
> I found this question in the Nabble archive (http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Maintaining-
> watermarks-per-key-instead-of-per-operator-instance-tp7288.html) but was
> unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
> Hello,
>
> I have similar requirements (see StackOverflor http://
> stackoverflow.com/questions/40465335/apache-flink-
> multiple-window-aggregations-and-late-data). I am pretty new to flink,
> could you elaborate on a possible solution? We can guarantee good ordering
> by sensor_id, thus watermarking by key would be the only reasonable way for
> us (*sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I
> do my own watermarking after
> *sensorData.keyBy('id').overwriteWatermarking()*... per key? Or maybe
> using custom state plus a custom trigger? What happens if a sensor dies or
> is being removed completely, how can this be detected as watermarks would
> be ignored for window garbage collection. Or could we dynamically schedule
> a job of each sensor? Which would result in 1000 Jobs.
>
>
> Thanks,
> Stephan
>
>
>


Re: Maintaining watermarks per key, instead of per operator instance

2016-06-02 Thread Kanstantsin Kamkou
Hi Aljoscha! Is it possible somehow to use the RichXFunction in CEP?
The task is pretty similar, but I have to ignore once the next
triggered event for the same key.


On Wed, Jun 1, 2016 at 2:54 PM, Aljoscha Krettek  wrote:
> Hi,
> yeah, in that case per-key watermarks would be useful for you. I won't be
> possible to add such a feature, though, due to the (possibly) dynamic nature
> of the key space and how watermark tracking works.
>
> You should be able to implement it with relatively low overhead using a
> RichFlatMapFunction and keyed state. This is the relevant section of the
> doc:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface.
>
> We are also in the process of improving our windowing system, especially
> when it comes to late data, cleanup and trigger semantics. You can have a
> look here if you're interested:
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing.
>
> Best,
> Aljoscha
>
> On Tue, 31 May 2016 at 14:36  wrote:
>>
>> Hi Aljoscha,
>>
>> thanks for the speedy reply.
>>
>> I am processing measurements delivered by smart meters. I use windows to
>> gather measurements and calculate values such as average consumption. The
>> key is simply the meter ID.
>>
>> The challenge is that meters may undergo network partitioning, under which
>> they fall back to local buffering. The data is then transmitted once
>> connectivity has been re-established. I am using event time to obtain
>> accurate calculations.
>>
>> If a specific meter goes offline, and the watermark progresses to the next
>> window for an operator instance, then all late data will be discarded once
>> that meter is online again, until it has caught up to the event time. This
>> is because I am using a custom EventTimeTrigger implementation that discards
>> late elements. The reason for that is because Flink would otherwise
>> immediately evaluate the window upon receiving a late element, which is a
>> problem since my calculations (e.g. the average consumption) depend on
>> multiple elements. I cannot calculate averages with that single late
>> element.
>>
>> Each individual meter guarantees in-order transmission of measurements. If
>> watermarks progressed per key, then i would never have late elements because
>> of that guarantee. I would be able to accurately calculate averages, with
>> the trade-off that my results would arrive sporadically from the same
>> operator instance.
>>
>> I suppose I could bypass the use of windows by implementing a stateful map
>> function that mimics windows to a certain degree. I implemented something
>> similar in Storm, but the amount of application logic required is
>> substantial.
>>
>> I completely understand why Flink evaluates a window on a late element,
>> since there is no other way to know when to evaluate the window as event
>> time has already progressed.
>>
>> Perhaps there is a way to gather/redirect late elements?
>>
>> Regards
>> Leon
>>
>> 31. May 2016 13:37 by aljos...@apache.org:
>>
>>
>> Hi,
>> I'm afraid this is impossible with the current design of Flink. Might I
>> ask what you want to achieve with this? Maybe we can come up with a
>> solution.
>>
>> -Aljoscha
>>
>> On Tue, 31 May 2016 at 13:24  wrote:
>>>
>>> My use case primarily concerns applying transformations per key, with the
>>> keys remaining fixed throughout the topology. I am using event time for my
>>> windows.
>>>
>>> The problem i am currently facing is that watermarks in windows propagate
>>> per operator instance, meaning the operator event time increases for all
>>> keys that the operator is in charge of. I wish for watermarks to progress
>>> per key, not per operator instance.
>>>
>>> Is this easily possible? I was unable to find an appropriate solution
>>> based on existing code recipes.
>>>
>>> Greetings
>>> Leon


Re: Maintaining watermarks per key, instead of per operator instance

2016-06-02 Thread leon_mclare
Hi again Aljoscha,

understood. Thanks for the link. I really like the straightforward approach 
concerning storing state. It makes things very easy.

The improvements are very interesting, particularly the composite triggers. 
That would significantly improve flexibility.

Kind regards
Leon

1. Jun 2016 14:54 by aljos...@apache.org:


> Hi,> yeah, in that case per-key watermarks would be useful for you. I won't 
> be possible to add such a feature, though, due to the (possibly) dynamic 
> nature of the key space and how watermark tracking works.
> You should be able to implement it with relatively low overhead using a 
> RichFlatMapFunction and keyed state. This is the relevant section of the 
> doc: > 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface>
>  
> .
> We are also in the process of improving our windowing system, especially 
> when it comes to late data, cleanup and trigger semantics. You can have a 
> look here if you're interested: > 
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing>
>  
> .
> Best,> Aljoscha
> On Tue, 31 May 2016 at 14:36 <> leon_mcl...@tutanota.com> > wrote:
>
>>   >> Hi Aljoscha,
>>
>> thanks for the speedy reply.
>>
>> I am processing measurements delivered by smart meters. I use windows to 
>> gather measurements and calculate values such as average consumption. The 
>> key is simply the meter ID.
>>
>> The challenge is that meters may undergo network partitioning, under which 
>> they fall back to local buffering. The data is then transmitted once 
>> connectivity has been re-established. I am using event time to obtain 
>> accurate calculations.
>>
>> If a specific meter goes offline, and the watermark progresses to the next 
>> window for an operator instance, then all late data will be discarded once 
>> that meter is online again, until it has caught up to the event time. This 
>> is because I am using a custom EventTimeTrigger implementation that 
>> discards late elements. The reason for that is because Flink would 
>> otherwise immediately evaluate the window upon receiving a late element, 
>> which is a problem since my calculations (e.g. the average consumption) 
>> depend on multiple elements. I cannot calculate averages with that single 
>> late element.
>>
>> Each individual meter guarantees in-order transmission of measurements. If 
>> watermarks progressed per key, then i would never have late elements 
>> because of that guarantee. I would be able to accurately calculate 
>> averages, with the trade-off that my results would arrive sporadically 
>> from the same operator instance.
>>
>> I suppose I could bypass the use of windows by implementing a stateful map 
>> function that mimics windows to a certain degree. I implemented something 
>> similar in Storm, but the amount of application logic required is 
>> substantial.
>>
>> I completely understand why Flink evaluates a window on a late element, 
>> since there is no other way to know when to evaluate the window as event 
>> time has already progressed.
>>
>> Perhaps there is a way to gather/redirect late elements?
>>
>> Regards
>> Leon
>>
>> 31. May 2016 13:37 by >> aljos...@apache.org>> :
>>
>>
>>> Hi,>>> I'm afraid this is impossible with the current design of Flink. 
>>> Might I ask what you want to achieve with this? Maybe we can come up with 
>>> a solution.
>>> -Aljoscha
>>> On Tue, 31 May 2016 at 13:24 <>>> leon_mcl...@tutanota.com>>> > wrote:
>>>
    My use case primarily concerns applying transformations 
 per key, with the keys remaining fixed throughout the topology. I am 
 using event time for my windows.

 The problem i am currently facing is that watermarks in windows 
 propagate per operator instance, meaning the operator event time 
 increases for all keys that the operator is in charge of. I wish for 
 watermarks to progress per key, not per operator instance.

 Is this easily possible? I was unable to find an appropriate solution 
 based on existing code recipes.

 Greetings
 Leon

>>   

Re: Maintaining watermarks per key, instead of per operator instance

2016-06-01 Thread Aljoscha Krettek
Hi,
yeah, in that case per-key watermarks would be useful for you. I won't be
possible to add such a feature, though, due to the (possibly) dynamic
nature of the key space and how watermark tracking works.

You should be able to implement it with relatively low overhead using a
RichFlatMapFunction and keyed state. This is the relevant section of the
doc:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface
.

We are also in the process of improving our windowing system, especially
when it comes to late data, cleanup and trigger semantics. You can have a
look here if you're interested:
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
.

Best,
Aljoscha

On Tue, 31 May 2016 at 14:36  wrote:

> Hi Aljoscha,
>
> thanks for the speedy reply.
>
> I am processing measurements delivered by smart meters. I use windows to
> gather measurements and calculate values such as average consumption. The
> key is simply the meter ID.
>
> The challenge is that meters may undergo network partitioning, under which
> they fall back to local buffering. The data is then transmitted once
> connectivity has been re-established. I am using event time to obtain
> accurate calculations.
>
> If a specific meter goes offline, and the watermark progresses to the next
> window for an operator instance, then all late data will be discarded once
> that meter is online again, until it has caught up to the event time. This
> is because I am using a custom EventTimeTrigger implementation that
> discards late elements. The reason for that is because Flink would
> otherwise immediately evaluate the window upon receiving a late element,
> which is a problem since my calculations (e.g. the average consumption)
> depend on multiple elements. I cannot calculate averages with that single
> late element.
>
> Each individual meter guarantees in-order transmission of measurements. If
> watermarks progressed per key, then i would never have late elements
> because of that guarantee. I would be able to accurately calculate
> averages, with the trade-off that my results would arrive sporadically from
> the same operator instance.
>
> I suppose I could bypass the use of windows by implementing a stateful map
> function that mimics windows to a certain degree. I implemented something
> similar in Storm, but the amount of application logic required is
> substantial.
>
> I completely understand why Flink evaluates a window on a late element,
> since there is no other way to know when to evaluate the window as event
> time has already progressed.
>
> Perhaps there is a way to gather/redirect late elements?
>
> Regards
> Leon
>
> 31. May 2016 13:37 by aljos...@apache.org:
>
>
> Hi,
> I'm afraid this is impossible with the current design of Flink. Might I
> ask what you want to achieve with this? Maybe we can come up with a
> solution.
>
> -Aljoscha
>
> On Tue, 31 May 2016 at 13:24  wrote:
>
>> My use case primarily concerns applying transformations per key, with the
>> keys remaining fixed throughout the topology. I am using event time for my
>> windows.
>>
>> The problem i am currently facing is that watermarks in windows propagate
>> per operator instance, meaning the operator event time increases for all
>> keys that the operator is in charge of. I wish for watermarks to progress
>> per key, not per operator instance.
>>
>> Is this easily possible? I was unable to find an appropriate solution
>> based on existing code recipes.
>>
>> Greetings
>> Leon
>>
>


Re: Maintaining watermarks per key, instead of per operator instance

2016-05-31 Thread leon_mclare
Hi Aljoscha,

thanks for the speedy reply.

I am processing measurements delivered by smart meters. I use windows to 
gather measurements and calculate values such as average consumption. The key 
is simply the meter ID.

The challenge is that meters may undergo network partitioning, under which 
they fall back to local buffering. The data is then transmitted once 
connectivity has been re-established. I am using event time to obtain 
accurate calculations.

If a specific meter goes offline, and the watermark progresses to the next 
window for an operator instance, then all late data will be discarded once 
that meter is online again, until it has caught up to the event time. This is 
because I am using a custom EventTimeTrigger implementation that discards 
late elements. The reason for that is because Flink would otherwise 
immediately evaluate the window upon receiving a late element, which is a 
problem since my calculations (e.g. the average consumption) depend on 
multiple elements. I cannot calculate averages with that single late element.

Each individual meter guarantees in-order transmission of measurements. If 
watermarks progressed per key, then i would never have late elements because 
of that guarantee. I would be able to accurately calculate averages, with the 
trade-off that my results would arrive sporadically from the same operator 
instance.

I suppose I could bypass the use of windows by implementing a stateful map 
function that mimics windows to a certain degree. I implemented something 
similar in Storm, but the amount of application logic required is 
substantial.

I completely understand why Flink evaluates a window on a late element, since 
there is no other way to know when to evaluate the window as event time has 
already progressed.

Perhaps there is a way to gather/redirect late elements?

Regards
Leon

31. May 2016 13:37 by aljos...@apache.org:


> Hi,> I'm afraid this is impossible with the current design of Flink. Might 
> I ask what you want to achieve with this? Maybe we can come up with a 
> solution.
> -Aljoscha
> On Tue, 31 May 2016 at 13:24 <> leon_mcl...@tutanota.com> > wrote:
>
>>   >> My use case primarily concerns applying transformations per 
>> key, with the keys remaining fixed throughout the topology. I am using 
>> event time for my windows.
>>
>> The problem i am currently facing is that watermarks in windows propagate 
>> per operator instance, meaning the operator event time increases for all 
>> keys that the operator is in charge of. I wish for watermarks to progress 
>> per key, not per operator instance.
>>
>> Is this easily possible? I was unable to find an appropriate solution 
>> based on existing code recipes.
>>
>> Greetings
>> Leon
>>   

Maintaining watermarks per key, instead of per operator instance

2016-05-31 Thread leon_mclare
My use case primarily concerns applying transformations per key, with the 
keys remaining fixed throughout the topology. I am using event time for my 
windows.

The problem i am currently facing is that watermarks in windows propagate per 
operator instance, meaning the operator event time increases for all keys 
that the operator is in charge of. I wish for watermarks to progress per key, 
not per operator instance.

Is this easily possible? I was unable to find an appropriate solution based 
on existing code recipes.

Greetings
Leon