You can do this by first doing a keyBy userId and then emitting the
value you want to average (session length). The output of this you
feed into the aggregateFunction that does a grouping by time and emits
the average per time.

input.keyBy(user).flatMap(extractSessionLength()).timeWindowAll(time).aggregate(averageAggregate())

TimeWindowAll is a single parallelism (no parallelism) operator, but
that is fine as long as you don't have huge throughput requirements.
If that becomes a problem, we would have to think about
pre-aggregating in parallel.

Does this help?

– Ufuk


On Fri, Dec 15, 2017 at 4:56 PM, Plamen Paskov
<plamen.pas...@next-stream.com> wrote:
> In my case i have a lot of users with one session per user. What i'm
> thinking is to evenly distribute the users then accumulate and finally merge
> all accumulators. The problem is that i don't know how to achieve this.
>
>
>
> On 15.12.2017 17:52, Ufuk Celebi wrote:
>>
>> You can first aggregate the length per user and emit it downstream.
>> Then you do the all window and average all lengths. Does that make
>> sense?
>>
>> On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov
>> <plamen.pas...@next-stream.com> wrote:
>>>
>>> I think i got your point.
>>> What happens now: in order to use aggregate() i need an window but the
>>> window requires keyBy() if i want to parallelize the data. In my case it
>>> will not work because if i create keyBy("userId") then the average
>>> will be calculated per userId  but i want average across all users. What
>>> would be the solution in this case?
>>>
>>> Thanks
>>>
>>>
>>> On 15.12.2017 15:46, Ufuk Celebi wrote:
>>>>
>>>> Hey Plamen,
>>>>
>>>> I think what you are looking for is the AggregateFunction. This you
>>>> can use on keyed streams. The Javadoc [1] contains an example for your
>>>> use case (averaging).
>>>>
>>>> – Ufuk
>>>>
>>>> [1]
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>>>
>>>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
>>>> <plamen.pas...@next-stream.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to calculate the running average of session length and i
>>>>> want
>>>>> to
>>>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>>>> trying to do it like this:
>>>>>
>>>>> package flink;
>>>>>
>>>>> import lombok.AllArgsConstructor;
>>>>> import lombok.NoArgsConstructor;
>>>>> import lombok.ToString;
>>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>>> import
>>>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>>>> import
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import
>>>>>
>>>>>
>>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>>>> import
>>>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>>>>> import org.apache.flink.util.Collector;
>>>>>
>>>>> import java.sql.Timestamp;
>>>>> import java.time.Instant;
>>>>> import java.time.LocalDateTime;
>>>>> import java.util.TimeZone;
>>>>>
>>>>>
>>>>> public class StreamingJob {
>>>>>       public static void main(String[] args) throws Exception {
>>>>>           StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>
>>>>>           SingleOutputStreamOperator<Event> sessions = env
>>>>>                   .socketTextStream("localhost", 9000, "\n")
>>>>>                   .map(new MapFunction<String, Event>() {
>>>>>                       @Override
>>>>>                       public Event map(String value) throws Exception {
>>>>>                           String[] row = value.split(",");
>>>>>                           return new Event(Long.valueOf(row[0]),
>>>>> row[1],
>>>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>>>                       }
>>>>>                   })
>>>>>                   .assignTimestampsAndWatermarks(new
>>>>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>>>                       @Override
>>>>>                       public long extractTimestamp(Event element) {
>>>>>                           return element.timestamp;
>>>>>                       }
>>>>>                   })
>>>>>                   .keyBy("userId", "sessionId")
>>>>>                   .maxBy("length");
>>>>>
>>>>>
>>>>>           sessions
>>>>>                   .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>>>>                   .apply(new AllWindowFunction<Event, Avg,
>>>>> TimeWindow>() {
>>>>>                       @Override
>>>>>                       public void apply(TimeWindow window,
>>>>> Iterable<Event>
>>>>> values, Collector<Avg> out) throws Exception {
>>>>>                           long sum = 0;
>>>>>                           int count = 0;
>>>>>
>>>>>                           for (Event event : values) {
>>>>>                               sum += event.length;
>>>>>                               count++;
>>>>>                           }
>>>>>
>>>>>                           double avg = sum / count;
>>>>>                           LocalDateTime windowStart =
>>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>>>>> TimeZone.getDefault().toZoneId());
>>>>>                           LocalDateTime windowEnd =
>>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>>>>> TimeZone.getDefault().toZoneId());
>>>>>                           out.collect(new Avg(avg,
>>>>> windowStart.toString(),
>>>>> windowEnd.toString()));
>>>>>                       }
>>>>>                   });
>>>>>
>>>>>           env.execute();
>>>>>       }
>>>>>
>>>>>       @AllArgsConstructor
>>>>>       @NoArgsConstructor
>>>>>       @ToString
>>>>>       public static class Avg {
>>>>>           public double length;
>>>>>           public String windowStart;
>>>>>           public String windowEnd;
>>>>>       }
>>>>>
>>>>>       @AllArgsConstructor
>>>>>       @NoArgsConstructor
>>>>>       @ToString
>>>>>       public static class Event {
>>>>>           public long userId;
>>>>>           public String sessionId;
>>>>>           public long length;
>>>>>           public long timestamp;
>>>>>       }
>>>>> }
>>>>>
>>>>> First i want to extract the last session event for every user-session
>>>>> because it contains the total session length. Then i want to calculate
>>>>> the
>>>>> average session length based on the data from
>>>>> previous operation (based on the sessions variable).
>>>>>
>>>>> Example:
>>>>>
>>>>> 1,s1,100,2017-12-13 11:58:01
>>>>> 1,s1,150,2017-12-13 11:58:02
>>>>> 1,s1,160,2017-12-13 11:58:03
>>>>> 2,s1,100,2017-12-13 11:58:04
>>>>>
>>>>> sessions variable should contain those rows:
>>>>> 1,s1,160,2017-12-13 11:58:03
>>>>> 2,s1,100,2017-12-13 11:58:04
>>>>>
>>>>> but it's returning the max length row only for the corresponding event.
>>>>>
>>>>> Questions:
>>>>> - how to collect the data for all groups in sessions variable?
>>>>> - is there another way to achieve this functionality because with my
>>>>> implementation the average will be computed on single node because
>>>>> sessions
>>>>> is of type SingleOutputStreamOperator<Event>
>>>>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals
>>>>> ?
>>>>>
>>>>> Thanks
>>>
>>>
>

Reply via email to