I think i got your point.
What happens now: in order to use aggregate() i need an window but the window requires keyBy() if i want to parallelize the data. In my case it will not work because if i create keyBy("userId") then the average will be calculated per userId  but i want average across all users. What would be the solution in this case?

Thanks


On 15.12.2017 15:46, Ufuk Celebi wrote:
Hey Plamen,

I think what you are looking for is the AggregateFunction. This you
can use on keyed streams. The Javadoc [1] contains an example for your
use case (averaging).

– Ufuk

[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java

On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
<plamen.pas...@next-stream.com> wrote:
Hi,

I'm trying to calculate the running average of session length and i want to
trigger the computation on a regular let's say 2 minutes interval. I'm
trying to do it like this:

package flink;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.TimeZone;


public class StreamingJob {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

         SingleOutputStreamOperator<Event> sessions = env
                 .socketTextStream("localhost", 9000, "\n")
                 .map(new MapFunction<String, Event>() {
                     @Override
                     public Event map(String value) throws Exception {
                         String[] row = value.split(",");
                         return new Event(Long.valueOf(row[0]), row[1],
Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
                     }
                 })
                 .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
                     @Override
                     public long extractTimestamp(Event element) {
                         return element.timestamp;
                     }
                 })
                 .keyBy("userId", "sessionId")
                 .maxBy("length");


         sessions
                 .timeWindowAll(Time.seconds(60), Time.seconds(30))
                 .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
                     @Override
                     public void apply(TimeWindow window, Iterable<Event>
values, Collector<Avg> out) throws Exception {
                         long sum = 0;
                         int count = 0;

                         for (Event event : values) {
                             sum += event.length;
                             count++;
                         }

                         double avg = sum / count;
                         LocalDateTime windowStart =
LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
TimeZone.getDefault().toZoneId());
                         LocalDateTime windowEnd =
LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
TimeZone.getDefault().toZoneId());
                         out.collect(new Avg(avg, windowStart.toString(),
windowEnd.toString()));
                     }
                 });

         env.execute();
     }

     @AllArgsConstructor
     @NoArgsConstructor
     @ToString
     public static class Avg {
         public double length;
         public String windowStart;
         public String windowEnd;
     }

     @AllArgsConstructor
     @NoArgsConstructor
     @ToString
     public static class Event {
         public long userId;
         public String sessionId;
         public long length;
         public long timestamp;
     }
}

First i want to extract the last session event for every user-session
because it contains the total session length. Then i want to calculate the
average session length based on the data from
previous operation (based on the sessions variable).

Example:

1,s1,100,2017-12-13 11:58:01
1,s1,150,2017-12-13 11:58:02
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04

sessions variable should contain those rows:
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04

but it's returning the max length row only for the corresponding event.

Questions:
- how to collect the data for all groups in sessions variable?
- is there another way to achieve this functionality because with my
implementation the average will be computed on single node because sessions
is of type SingleOutputStreamOperator<Event>
- can i use ContinuousEventTimeTrigger to trigger at regular intervals ?

Thanks

Reply via email to