Thank you for your reply. Please let me know if other classes o full code is
needed. 

/**
 * Count how many total events
*/

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(4, env_config);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.setProperty("group.id", "test");
        properties.setProperty("client.id", "flink_test");
        properties.setProperty("auto.offset.reset", "earliest");

        final int maxEventDelay = 5; // events are out of order by max x
seconds
        DataStream<BizEvent> bizs = env.addSource(new
FlinkKafkaConsumer09<>(KAFKA_TOPIC,
                new BizSchema(), properties)).
                assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<BizEvent>() {

                    long curTimeStamp;

                    @Override
                    public long extractTimestamp(BizEvent biz, long
currentTimestamp) {
                        curTimeStamp = currentTimestamp;
                        return biz.time.getMillis();
                    }

                    @Override
                    public long getCurrentWatermark() {
                        return (curTimeStamp - (maxEventDelay * 1000));
                    }
                });

        DataStream<Tuple2&lt;BizEvent, Integer>> bizCnt = bizs.flatMap(new
CountBiz());

        DataStream<Tuple2&lt;String, Integer>> bizWindowTotal =
bizCnt.timeWindowAll(Time.of(5, TimeUnit.MINUTES))
              .apply(new SumStartTsAllWindow());

   // Output(start time of windows, counts)
    public static class SumStartTsAllWindow implements
AllWindowFunction<Iterable&lt;Tuple2&lt;BizEvent, Integer>>,
            Tuple2<String, Integer>, TimeWindow> {

        private static DateTimeFormatter timeFormatter =
               
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").withLocale(Locale.GERMAN).
                        withZone(DateTimeZone.forID("Europe/Berlin"));
        @Override
        public void apply(TimeWindow timeWindow,
Iterable<Tuple2&lt;BizEvent, Integer>> values,
                          Collector<Tuple2&lt;String, Integer>> collector)
throws Exception {

            DateTime startTs = new DateTime(timeWindow.getStart(),
DateTimeZone.forID("Europe/Berlin"));


            Iterator<Tuple2&lt;BizEvent, Integer>> it = values.iterator();
            int sum=0;
            while(it.hasNext()){
                Tuple2<BizEvent, Integer> value = it.next();
                sum += value.f1;
            }
            collector.collect(new Tuple2<>(startTs.toString(timeFormatter),
sum));
        }
    }

    // Output (BizEvent, 1)
    public static class CountBiz implements FlatMapFunction<BizEvent,
Tuple2&lt;BizEvent, Integer>> {

        @Override
        public void flatMap(BizEvent bizEvent, Collector<Tuple2&lt;BizEvent,
Integer>> collector) {
            //System.out.println("TIme in count!: " + bizEvent.time);
            collector.collect(new Tuple2<>(bizEvent, (int) 1));
        }
    }



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5151.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to