I have a requirement where I want to do aggregation on one data stream
every 5 minutes, a different data stream every 1 minute. I wrote a example
code to test this out but the behavior is different from what I expected ,
I expected the window2 to be called 5 times, and window 1 to called once ,
but in a 5 minute interval the window 1 is called once and window2 is
called only once, have I understood the windowed function incorrectly, does
the input play a role in no of times a window apply is called. I use the nc
command to write to the socket port 9999 and 9998.



import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow,
TimeWindow}

import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.windowing.windows.Window


  object WindowWordCount {
    def main(args: Array[String]) {

      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
      val text = env.socketTextStream("localhost", 9999)
      val text1 = env.socketTextStream("localhost", 9998)
      val stream:DataStream[String] = text.flatMap {
_.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      val count =
stream.windowAll(TumblingEventTimeWindows.of(Time.minutes(5))).apply {
new MyAllWindowFunction }


      count.print

      val counts1 = text1.flatMap { _.toLowerCase.split("\\W+") filter
{ _.nonEmpty } }
        .windowAll(TumblingEventTimeWindows.of(Time.minutes(1))).apply
{ new MyAllWindowFunction2 }

      counts1.print

      env.execute("Window Stream WordCount")
    }

    class MyAllWindowFunction extends
AllWindowFunction[String,String,TimeWindow]
    {
      def apply(window : TimeWindow, input : scala.Iterable[String],
out : org.apache.flink.util.Collector[String]): Unit =
      {
        System.out.println("timed window1 is called")
      }
    }

    class MyAllWindowFunction2 extends
AllWindowFunction[String,String,TimeWindow]
    {
      def apply(window : TimeWindow, input : scala.Iterable[String],
out : org.apache.flink.util.Collector[String]): Unit =
      {
        System.out.println("timed window2 is called")
      }
    }
  }


The output was:

timed window2 is called
timed window1 is called

Reply via email to