Hi,

I have this simple flow:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
    override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
      (10000 to 10090).foreach(ctx.collect)
      Thread.sleep(1000)
      (20 to 30).foreach(ctx.collect)
    }
    override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
  .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int,
TimeWindow] {
    override def process(key: Int, context: Context, elements:
Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
      out.collect(elements.map(_._1).toList)
    }
  })
stream
  .print()
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()

senv.execute()

This is a simple stream which uses a session window on integers and then
uses process(…) to just collect them into a list. There's also side output
for late data.
When I run this job I can see printing to stdout of the late messages
without any problem.

However, when I add a map(…) after process(…), the late data isn't getting
into the sideoutput and I cannot see the printing to stdout:
…
.sideOutputLateData(tag)
.process(…)
.map(list => list :+ 42)
…

Is this a bug or is it working as intended? If it's not a bug - does it
mean I cannot add any operator after process(…)?

Thanks

Reply via email to