val stream:DataStream[String] = env .addSource(new FlinkKafkaConsumer08[String]("topic_name", new SimpleStringSchema, prop))
val event:DataStream[SomeEventObj] = stream.map(MyMapFunction) val tenMinute:DataStream[AggEvents] = ridesByDeviceIdStream.timeWindowAll(Time.of(10, TimeUnit.MINUTES).trigger (ContinuousProcessingTimeTrigger.of(Time.minutes(1))).map(MyMapFunction1) val oneHour = tenMinute.keyBy(_.mykey).TumblingEventTimeWindows.of(Time.minutes(60))).trigger (MyTriggerFunction) Above is pseduo code, may have some syntax errors but is should do what you are looking for. There is dependency on the tenminute window and one hour window function, so one will execute after the other. On Sun, Mar 27, 2016 at 2:20 PM, Chen Bekor <chen.be...@gmail.com> wrote: > hi all! > > I'm just starting my way with flink and I have a design question. > > I'm trying to aggregate incoming events (source: kafka topic) on a 10min > tumbling window in order to calculate the incoming events rate (total per > minute). > > I would like to take this window and perform an additional window (60 min) > in order to calculate percentiles, std deviation and some other statistics > on that time window. finally I would like to trigger some business logic in > case the calculation hits a certain threshold. > > my main challenge is - how to chain the two windows together. > > any help is appreciated (please send scala example code - I'm not using > java :) for this project) >