Hi,

I have a question about how to correctly set up a test that will read input from locally provided collection in bounded mode and provide outputs at the end of the computation. My test case looks something like the following:

String[] lines = ...;
try (StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()) {
  env.setRuntimeMode(RuntimeExecutionMode.BATCH); 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<String> input = 
env.fromCollection(Arrays.asList(lines)); // the following includes keyBy().window().aggregate() 
DataStream<Iterable<Tuple2<String, Long>>> output = TopWords.getTopWords(input, 5); // sink that collects outputs 
to (static) list SinkFunction<Iterable<Tuple2<String, Long>>> sink = TestSink.create(); output.addSink(sink); 
env.execute(); List<Tuple2<String, Long>> output =
      
TestSink.outputs(sink).stream().flatMap(Streams::stream).collect(Collectors.toList());
 // output is empty }

When I remove any window() and aggregations I can see outputs at the sink. I tried create manual trigger for the window, I can see onElement() methods called, but onEventTime() is never called. Also the FromElementsFunction never explicitly emits max watermark at the end reading the input. Seems that Flink runtime also does not emit the final watermark (I cannot see any traces of any watermark emission in the logs). Seems that I'm doing something obviously wrong, I just cannot figure out how to emit the final watermark. Adding autoWatermarkInterval did not help either.

The test seems to behave the same for Flink versions 1.16.3 and 1.18.1.

Thanks in advance for any pointers.

 Jan

Reply via email to