Sounds good to me. But I still need to have some kind of side output 
(cassandra) that stores the accumulating aggregates on each time scale (minute, 
hour). Thus I would need to have something like this

var hourly = stream.window(1.hour).apply(..)
//write to cassandra
hourly.trigger(accumulating).addSink(cassandra)
//forward to next acc step
var daily = hourly.trigger(discarding).window(1.day).apply(…)
//write to cassandra
daily.trigger(accumulating).addSink(cassandra)

Would this be possible?

best, Stephan
> On 23 Nov 2016, at 11:16, Aljoscha Krettek [via Apache Flink User Mailing 
> List archive.] <ml-node+s2336050n10294...@n4.nabble.com> wrote:
> 
> You can implement discarding behaviour by writing a custom trigger (based on 
> EventTimeTrigger) that returns FIRE_AND_PURGE when firing. With this you 
> could maybe implement a cascade of windows where the first aggregates for the 
> smallest time interval and is discarding and where the other triggers take 
> these "pre-aggregated" values and accumulate.
> 
> On Tue, 22 Nov 2016 at 08:11 Stephan Epping <[hidden email] 
> <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=0>> wrote:
> Hey Aljoscha,
> 
> the first solution did not work out as expected. As when late elements arrive 
> the first window is triggered again and would emit a new (accumulated) event, 
> that would be counted twice (in time accumulation and late accumulation) in 
> the second window.I could implement my own (discarding strategy) like in 
> Apache Beam, but the out stream should contain accumulated events that are 
> stored in cassandra. The second solution just gave an compiler error, thus I 
> think is not possible right now.
> 
> best Stephan
> 
> 
> 
>> On 21 Nov 2016, at 17:56, Aljoscha Krettek <[hidden email] 
>> <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=1>> wrote:
>> 
>> Hi,
>> why did you settle for the last solution?
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Thu, 17 Nov 2016 at 15:57 kaelumania <[hidden email] 
>> <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=2>> wrote:
>> Hi Fabian,
>> 
>> your proposed solution for:
>>  
>> Multiple window aggregations
>> You can construct a data flow of cascading window operators and fork off (to 
>> emit or further processing) the result after each window.
>> 
>> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
>>                         \-> out_1        \-> out_2         \-> out_3
>> does not work, am I missing something?
>> 
>> First I tried the following
>> DataStream<Reading> values = input.assignTimestampsAndWatermarks(new 
>> StrictWatermarkAssigner()); // force lateness
>> 
>> DataStream<ReadingAggregate> aggregatesPerMinute = values
>>         .keyBy("id")
>>         .timeWindow(Time.minutes(1))
>>         .allowedLateness(Time.minutes(2))
>>         .apply(new ReadingAggregate(), new AggregateReadings(), new 
>> AggregateReadings());
>> 
>> DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
>>         .keyBy("id")
>>         .timeWindow(Time.hours(1))
>>         .allowedLateness(Time.hours(2))
>>         .apply(new AggregateReadingAggregates(), new 
>> AggregateReadingAggregates());
>> but due to late data the first fold function would emit 2 rolling aggregates 
>> (one with and one without the late element), which results in being counted 
>> twice within the second reducer. Therefore i tried
>> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
>>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
>> force lateness
>>         .keyBy("id")
>>         .timeWindow(Time.minutes(1))
>>         .allowedLateness(Time.hours(2));
>> 
>> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = 
>> readingsPerMinute
>>         .timeWindow(Time.hours(1))
>>         .allowedLateness(Time.hours(2));
>> 
>> DataStream<ReadingAggregate> aggregatesPerMinute = 
>> readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new 
>> AggregateReadings());
>> DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new 
>> ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> which gives me a compiler error as WindowedStream does not provide a 
>> timeWindow method.
>> 
>> Finally I settled with this:
>> KeyedStream<Reading, Tuple> readings = input
>>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
>> force lateness
>>         .keyBy("id");
>> 
>> DataStream<ReadingAggregate> aggregatesPerMinute = readings
>>         .timeWindow(Time.minutes(1))
>>         .allowedLateness(Time.hours(2))
>>         .apply(new ReadingAggregate(), new AggregateReadings(), new 
>> AggregateReadings());
>> 
>> DataStream<ReadingAggregate> aggregatesPerHour = readings
>>         .timeWindow(Time.hours(1))
>>         .allowedLateness(Time.hours(2))
>>         .apply(new ReadingAggregate(), new AggregateReadings(), new 
>> AggregateReadings());
>> 
>> 
>> Feedback is very welcome.
>> 
>> best, Stephan
>> 
>> 
>> 
>> 
>>> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing 
>>> List archive.] <[hidden email] 
>>> <http://user/SendEmail.jtp?type=node&node=10179&i=0>> wrote:
>>> 
>> 
>>> Hi Stephan,
>>> 
>>> I just wrote an answer to your SO question. 
>>> 
>>> Best, Fabian
>> 
>>> 
>>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a 
>>> href="x-msg://3/user/SendEmail.jtp?type=node&amp;node=10033&amp;i=0 <>" 
>>> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>> 
>>> 
>>> Hello,
>>> 
>>> I found this question in the Nabble archive 
>>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>>>  but was unable/dont know how to reply.
>>> 
>>> Here is my question regarding the mentioned thread:
>>> 
>>>> Hello, 
>>>> 
>>>> I have similar requirements (see StackOverflor 
>>>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
>>>>  
>>>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
>>>>  I am pretty new to flink, could you elaborate on a possible solution? We 
>>>> can guarantee good ordering by sensor_id, thus watermarking by key would 
>>>> be the only reasonable way for us 
>>>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my 
>>>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... 
>>>> per key? Or maybe using custom state plus a custom trigger? What happens 
>>>> if a sensor dies or is being removed completely, how can this be detected 
>>>> as watermarks would be ignored for window garbage collection. Or could we 
>>>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>>> 
>>> 
>>> Thanks,
>>> Stephan
>>> 
>>> 
>> 
>>> If you reply to this email, your message will be added to the discussion 
>>> below:
>> 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html>
>>> To unsubscribe from Maintaining watermarks per key, instead of per operator 
>>> instance, click here <>.
>>> NAML 
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>> View this message in context: Re: Maintaining watermarks per key, instead of 
>> per operator instance 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at 
>> Nabble.com <http://nabble.com/>.
> 
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html>
> To unsubscribe from Maintaining watermarks per key, instead of per operator 
> instance, click here 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7288&code=c3RlcGhhbi5lcHBpbmdAendlaXRhZy5kZXw3Mjg4fC0yNzYyODY4NzI=>.
> NAML 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10295.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to