You can do this by first doing a keyBy userId and then emitting the
value you want to average (session length). The output of this you
feed into the aggregateFunction that does a grouping by time and emits
the average per time.


TimeWindowAll is a single parallelism (no parallelism) operator, but
that is fine as long as you don't have huge throughput requirements.
If that becomes a problem, we would have to think about
pre-aggregating in parallel.

Does this help?

– Ufuk

On Fri, Dec 15, 2017 at 4:56 PM, Plamen Paskov
<> wrote:
> In my case i have a lot of users with one session per user. What i'm
> thinking is to evenly distribute the users then accumulate and finally merge
> all accumulators. The problem is that i don't know how to achieve this.
> On 15.12.2017 17:52, Ufuk Celebi wrote:
>> You can first aggregate the length per user and emit it downstream.
>> Then you do the all window and average all lengths. Does that make
>> sense?
>> On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov
>> <> wrote:
>>> I think i got your point.
>>> What happens now: in order to use aggregate() i need an window but the
>>> window requires keyBy() if i want to parallelize the data. In my case it
>>> will not work because if i create keyBy("userId") then the average
>>> will be calculated per userId  but i want average across all users. What
>>> would be the solution in this case?
>>> Thanks
>>> On 15.12.2017 15:46, Ufuk Celebi wrote:
>>>> Hey Plamen,
>>>> I think what you are looking for is the AggregateFunction. This you
>>>> can use on keyed streams. The Javadoc [1] contains an example for your
>>>> use case (averaging).
>>>> – Ufuk
>>>> [1]
>>>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
>>>> <> wrote:
>>>>> Hi,
>>>>> I'm trying to calculate the running average of session length and i
>>>>> want
>>>>> to
>>>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>>>> trying to do it like this:
>>>>> package flink;
>>>>> import lombok.AllArgsConstructor;
>>>>> import lombok.NoArgsConstructor;
>>>>> import lombok.ToString;
>>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>>> import
>>>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>>>> import
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import
>>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>>>> import
>>>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>> import;
>>>>> import org.apache.flink.util.Collector;
>>>>> import java.sql.Timestamp;
>>>>> import java.time.Instant;
>>>>> import java.time.LocalDateTime;
>>>>> import java.util.TimeZone;
>>>>> public class StreamingJob {
>>>>>       public static void main(String[] args) throws Exception {
>>>>>           StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>           SingleOutputStreamOperator<Event> sessions = env
>>>>>                   .socketTextStream("localhost", 9000, "\n")
>>>>>                   .map(new MapFunction<String, Event>() {
>>>>>                       @Override
>>>>>                       public Event map(String value) throws Exception {
>>>>>                           String[] row = value.split(",");
>>>>>                           return new Event(Long.valueOf(row[0]),
>>>>> row[1],
>>>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>>>                       }
>>>>>                   })
>>>>>                   .assignTimestampsAndWatermarks(new
>>>>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>>>                       @Override
>>>>>                       public long extractTimestamp(Event element) {
>>>>>                           return element.timestamp;
>>>>>                       }
>>>>>                   })
>>>>>                   .keyBy("userId", "sessionId")
>>>>>                   .maxBy("length");
>>>>>           sessions
>>>>>                   .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>>>>                   .apply(new AllWindowFunction<Event, Avg,
>>>>> TimeWindow>() {
>>>>>                       @Override
>>>>>                       public void apply(TimeWindow window,
>>>>> Iterable<Event>
>>>>> values, Collector<Avg> out) throws Exception {
>>>>>                           long sum = 0;
>>>>>                           int count = 0;
>>>>>                           for (Event event : values) {
>>>>>                               sum += event.length;
>>>>>                               count++;
>>>>>                           }
>>>>>                           double avg = sum / count;
>>>>>                           LocalDateTime windowStart =
>>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>>>>> TimeZone.getDefault().toZoneId());
>>>>>                           LocalDateTime windowEnd =
>>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>>>>> TimeZone.getDefault().toZoneId());
>>>>>                           out.collect(new Avg(avg,
>>>>> windowStart.toString(),
>>>>> windowEnd.toString()));
>>>>>                       }
>>>>>                   });
>>>>>           env.execute();
>>>>>       }
>>>>>       @AllArgsConstructor
>>>>>       @NoArgsConstructor
>>>>>       @ToString
>>>>>       public static class Avg {
>>>>>           public double length;
>>>>>           public String windowStart;
>>>>>           public String windowEnd;
>>>>>       }
>>>>>       @AllArgsConstructor
>>>>>       @NoArgsConstructor
>>>>>       @ToString
>>>>>       public static class Event {
>>>>>           public long userId;
>>>>>           public String sessionId;
>>>>>           public long length;
>>>>>           public long timestamp;
>>>>>       }
>>>>> }
>>>>> First i want to extract the last session event for every user-session
>>>>> because it contains the total session length. Then i want to calculate
>>>>> the
>>>>> average session length based on the data from
>>>>> previous operation (based on the sessions variable).
>>>>> Example:
>>>>> 1,s1,100,2017-12-13 11:58:01
>>>>> 1,s1,150,2017-12-13 11:58:02
>>>>> 1,s1,160,2017-12-13 11:58:03
>>>>> 2,s1,100,2017-12-13 11:58:04
>>>>> sessions variable should contain those rows:
>>>>> 1,s1,160,2017-12-13 11:58:03
>>>>> 2,s1,100,2017-12-13 11:58:04
>>>>> but it's returning the max length row only for the corresponding event.
>>>>> Questions:
>>>>> - how to collect the data for all groups in sessions variable?
>>>>> - is there another way to achieve this functionality because with my
>>>>> implementation the average will be computed on single node because
>>>>> sessions
>>>>> is of type SingleOutputStreamOperator<Event>
>>>>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals
>>>>> ?
>>>>> Thanks

