Hello. I wrote below codes.
It works extraordinarily. Processing performs after SourceFunction generates all data and quit. If SourceFunction works infinitely, processing is never performed. But, it works well when parallelismForTimestamp is given other value (eg. 3), I want to know the mechanism of processing of below codes. Sincerely, Sung Gon ————— package org.skon.flink import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.CountTrigger import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.flink.util.Collector object ParallelismWithGlobalWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(50L) env.setParallelism(1) val maxParallelism = 4 val parallelismForTimestamp = 4 val stream = env .addSource(new SourceFunction[(Long, Long, Long)] { override def run(ctx: SourceFunction.SourceContext[(Long, Long, Long)]): Unit = { (0 to 250000).foreach(count => ctx.collect((count, 1L, 2L))) } override def cancel(): Unit = {} }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Long, Long, Long)](Time.seconds(0L)) { override def extractTimestamp(element: (Long, Long, Long)): Long = element._1 }) stream .keyBy(_._2) .window(GlobalWindows.create) .evictor(TimeEvictor.of(Time.seconds(20L))) .trigger(CountTrigger.of(1L)) .apply[(Long, Long, Long)]((_: Long, _: GlobalWindow, elements: Iterable[(Long, Long, Long)], out: Collector[(Long, Long, Long)]) => out.collect(elements.last)) .setParallelism(maxParallelism) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Long, Long, Long)](Time.seconds(0L)) { override def extractTimestamp(element: (Long, Long, Long)): Long = element._1 }) .setParallelism(parallelismForTimestamp) .keyBy(_._3) .window(TumblingEventTimeWindows.of(Time.seconds(5L))) .reduce((_, v2) => v2) .setParallelism(maxParallelism) .process[(Long, Long, Long)]((value, _, out) => { Console.println(value) out.collect(value) }) .addSink(new CollectionSink[(Long, Long, Long)]) env.execute("Parallelism Test with Global Window") } class CollectionSink[T] extends SinkFunction[T] { private val values: List[String] = List[String]() override def invoke(value: T): Unit = values + value.toString } } —————