You can do this by first doing a keyBy userId and then emitting the value you want to average (session length). The output of this you feed into the aggregateFunction that does a grouping by time and emits the average per time.
input.keyBy(user).flatMap(extractSessionLength()).timeWindowAll(time).aggregate(averageAggregate()) TimeWindowAll is a single parallelism (no parallelism) operator, but that is fine as long as you don't have huge throughput requirements. If that becomes a problem, we would have to think about pre-aggregating in parallel. Does this help? – Ufuk On Fri, Dec 15, 2017 at 4:56 PM, Plamen Paskov <plamen.pas...@next-stream.com> wrote: > In my case i have a lot of users with one session per user. What i'm > thinking is to evenly distribute the users then accumulate and finally merge > all accumulators. The problem is that i don't know how to achieve this. > > > > On 15.12.2017 17:52, Ufuk Celebi wrote: >> >> You can first aggregate the length per user and emit it downstream. >> Then you do the all window and average all lengths. Does that make >> sense? >> >> On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov >> <plamen.pas...@next-stream.com> wrote: >>> >>> I think i got your point. >>> What happens now: in order to use aggregate() i need an window but the >>> window requires keyBy() if i want to parallelize the data. In my case it >>> will not work because if i create keyBy("userId") then the average >>> will be calculated per userId but i want average across all users. What >>> would be the solution in this case? >>> >>> Thanks >>> >>> >>> On 15.12.2017 15:46, Ufuk Celebi wrote: >>>> >>>> Hey Plamen, >>>> >>>> I think what you are looking for is the AggregateFunction. This you >>>> can use on keyed streams. The Javadoc [1] contains an example for your >>>> use case (averaging). >>>> >>>> – Ufuk >>>> >>>> [1] >>>> >>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java >>>> >>>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov >>>> <plamen.pas...@next-stream.com> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I'm trying to calculate the running average of session length and i >>>>> want >>>>> to >>>>> trigger the computation on a regular let's say 2 minutes interval. I'm >>>>> trying to do it like this: >>>>> >>>>> package flink; >>>>> >>>>> import lombok.AllArgsConstructor; >>>>> import lombok.NoArgsConstructor; >>>>> import lombok.ToString; >>>>> import org.apache.flink.api.common.functions.MapFunction; >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; >>>>> import >>>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>> import >>>>> >>>>> >>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; >>>>> import >>>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; >>>>> import org.apache.flink.streaming.api.windowing.time.Time; >>>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow; >>>>> import org.apache.flink.util.Collector; >>>>> >>>>> import java.sql.Timestamp; >>>>> import java.time.Instant; >>>>> import java.time.LocalDateTime; >>>>> import java.util.TimeZone; >>>>> >>>>> >>>>> public class StreamingJob { >>>>> public static void main(String[] args) throws Exception { >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>> >>>>> SingleOutputStreamOperator<Event> sessions = env >>>>> .socketTextStream("localhost", 9000, "\n") >>>>> .map(new MapFunction<String, Event>() { >>>>> @Override >>>>> public Event map(String value) throws Exception { >>>>> String[] row = value.split(","); >>>>> return new Event(Long.valueOf(row[0]), >>>>> row[1], >>>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime()); >>>>> } >>>>> }) >>>>> .assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) { >>>>> @Override >>>>> public long extractTimestamp(Event element) { >>>>> return element.timestamp; >>>>> } >>>>> }) >>>>> .keyBy("userId", "sessionId") >>>>> .maxBy("length"); >>>>> >>>>> >>>>> sessions >>>>> .timeWindowAll(Time.seconds(60), Time.seconds(30)) >>>>> .apply(new AllWindowFunction<Event, Avg, >>>>> TimeWindow>() { >>>>> @Override >>>>> public void apply(TimeWindow window, >>>>> Iterable<Event> >>>>> values, Collector<Avg> out) throws Exception { >>>>> long sum = 0; >>>>> int count = 0; >>>>> >>>>> for (Event event : values) { >>>>> sum += event.length; >>>>> count++; >>>>> } >>>>> >>>>> double avg = sum / count; >>>>> LocalDateTime windowStart = >>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()), >>>>> TimeZone.getDefault().toZoneId()); >>>>> LocalDateTime windowEnd = >>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()), >>>>> TimeZone.getDefault().toZoneId()); >>>>> out.collect(new Avg(avg, >>>>> windowStart.toString(), >>>>> windowEnd.toString())); >>>>> } >>>>> }); >>>>> >>>>> env.execute(); >>>>> } >>>>> >>>>> @AllArgsConstructor >>>>> @NoArgsConstructor >>>>> @ToString >>>>> public static class Avg { >>>>> public double length; >>>>> public String windowStart; >>>>> public String windowEnd; >>>>> } >>>>> >>>>> @AllArgsConstructor >>>>> @NoArgsConstructor >>>>> @ToString >>>>> public static class Event { >>>>> public long userId; >>>>> public String sessionId; >>>>> public long length; >>>>> public long timestamp; >>>>> } >>>>> } >>>>> >>>>> First i want to extract the last session event for every user-session >>>>> because it contains the total session length. Then i want to calculate >>>>> the >>>>> average session length based on the data from >>>>> previous operation (based on the sessions variable). >>>>> >>>>> Example: >>>>> >>>>> 1,s1,100,2017-12-13 11:58:01 >>>>> 1,s1,150,2017-12-13 11:58:02 >>>>> 1,s1,160,2017-12-13 11:58:03 >>>>> 2,s1,100,2017-12-13 11:58:04 >>>>> >>>>> sessions variable should contain those rows: >>>>> 1,s1,160,2017-12-13 11:58:03 >>>>> 2,s1,100,2017-12-13 11:58:04 >>>>> >>>>> but it's returning the max length row only for the corresponding event. >>>>> >>>>> Questions: >>>>> - how to collect the data for all groups in sessions variable? >>>>> - is there another way to achieve this functionality because with my >>>>> implementation the average will be computed on single node because >>>>> sessions >>>>> is of type SingleOutputStreamOperator<Event> >>>>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals >>>>> ? >>>>> >>>>> Thanks >>> >>> >