You can first aggregate the length per user and emit it downstream.
Then you do the all window and average all lengths. Does that make

> I think i got your point.
> What happens now: in order to use aggregate() i need an window but the
> window requires keyBy() if i want to parallelize the data. In my case it
> will not work because if i create keyBy("userId") then the average
> will be calculated per userId  but i want average across all users. What
> would be the solution in this case?
>> Hey Plamen,
>> I think what you are looking for is the AggregateFunction. This you
>> can use on keyed streams. The Javadoc [1] contains an example for your
>> use case (averaging).
>>> Hi,
>>> I'm trying to calculate the running average of session length and i want
>>> to
>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>> trying to do it like this:
>>> package flink;
>>> import lombok.AllArgsConstructor;
>>> import lombok.NoArgsConstructor;
>>> import lombok.ToString;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import
>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>> import
>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import;
>>> import org.apache.flink.util.Collector;
>>> import java.sql.Timestamp;
>>> import java.time.Instant;
>>> import java.time.LocalDateTime;
>>> import java.util.TimeZone;
>>> public class StreamingJob {
>>>      public static void main(String[] args) throws Exception {
>>>          StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>          SingleOutputStreamOperator<Event> sessions = env
>>>                  .socketTextStream("localhost", 9000, "\n")
>>>                  .map(new MapFunction<String, Event>() {
>>>                      @Override
>>>                      public Event map(String value) throws Exception {
>>>                          String[] row = value.split(",");
>>>                          return new Event(Long.valueOf(row[0]), row[1],
>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>                      }
>>>                  })
>>>                  .assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>                      @Override
>>>                      public long extractTimestamp(Event element) {
>>>                          return element.timestamp;
>>>                      }
>>>                  })
>>>                  .keyBy("userId", "sessionId")
>>>                  .maxBy("length");
>>>          sessions
>>>                  .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>>                  .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
>>>                      @Override
>>>                      public void apply(TimeWindow window, Iterable<Event>
>>> values, Collector<Avg> out) throws Exception {
>>>                          long sum = 0;
>>>                          int count = 0;
>>>                          for (Event event : values) {
>>>                              sum += event.length;
>>>                              count++;
>>>                          }
>>>                          double avg = sum / count;
>>>                          LocalDateTime windowStart =
>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>>> TimeZone.getDefault().toZoneId());
>>>                          LocalDateTime windowEnd =
>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>>> TimeZone.getDefault().toZoneId());
>>>                          out.collect(new Avg(avg, windowStart.toString(),
>>> windowEnd.toString()));
>>>                      }
>>>                  });
>>>          env.execute();
>>>      }
>>>      @AllArgsConstructor
>>>      @NoArgsConstructor
>>>      @ToString
>>>      public static class Avg {
>>>          public double length;
>>>          public String windowStart;
>>>          public String windowEnd;
>>>      }
>>>      @AllArgsConstructor
>>>      @NoArgsConstructor
>>>      @ToString
>>>      public static class Event {
>>>          public long userId;
>>>          public String sessionId;
>>>          public long length;
>>>          public long timestamp;
>>>      }
>>> }
>>> First i want to extract the last session event for every user-session
>>> because it contains the total session length. Then i want to calculate
>>> the
>>> average session length based on the data from
>>> previous operation (based on the sessions variable).
>>> Example:
>>> 1,s1,100,2017-12-13 11:58:01
>>> 1,s1,150,2017-12-13 11:58:02
>>> 1,s1,160,2017-12-13 11:58:03
>>> 2,s1,100,2017-12-13 11:58:04
>>> sessions variable should contain those rows:
>>> 1,s1,160,2017-12-13 11:58:03
>>> 2,s1,100,2017-12-13 11:58:04
>>> but it's returning the max length row only for the corresponding event.
>>> Questions:
>>> - how to collect the data for all groups in sessions variable?
>>> - is there another way to achieve this functionality because with my
>>> implementation the average will be computed on single node because
>>> sessions
>>> is of type SingleOutputStreamOperator<Event>
>>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
>>> Thanks

