Hi,

You defined a tumbling window 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows>)
 of 60 seconds, triggered every 10 seconds. This means that each input element 
can be processed/averaged up to 6 times (there is no other way if you trigger 
each window multiple times).

I am not sure what are you trying to achieve, but please refer to the 
documentation about different window types (tumbling, sliding, session) maybe 
it will clarify things for you:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners>

If you want to avoid duplicated processing, use either tumbling window with 
default trigger (triggering at the end of the window), or use session windows.

Piotrek


> On 21 Dec 2017, at 13:29, Plamen Paskov <plamen.pas...@next-stream.com> wrote:
> 
> Hi guys,
> I have the following code:
> 
> SingleOutputStreamOperator<Event> lastUserSession = env
>         .socketTextStream("localhost", 9000, "\n")
>         .map(new MapFunction<String, Event>() {
>             @Override
>             public Event map(String value) throws Exception {
>                 String[] row = value.split(",");
>                 return new Event(Long.valueOf(row[0]), row[1], 
> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>             }
>         })
>         .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>             @Override
>             public long extractTimestamp(Event element) {
>                 return element.timestamp;
>             }
>         })
>         .keyBy("userId", "sessionId")
>         .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>         .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
>         .maxBy("length", false);
> 
> lastUserSession
>         .timeWindowAll(Time.seconds(60))
>         .aggregate(new AverageSessionLengthAcrossAllUsers())
>         .print();
> 
> What i'm trying to achieve is to calculate the average session length every 
> 10 seconds. The problem is that once the window length is 60 seconds and a 
> computation is triggered
> every 10 seconds i will receive duplicate events in my average calculation 
> method so the average will not be correct. If i move 
> ContinuousProcessingTimeTrigger down before 
> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 
> seconds.
> Any other suggestions how to workaround this?
> 
> Thanks

Reply via email to