Hi,
I have a question about how to correctly set up a test that will read
input from locally provided collection in bounded mode and provide
outputs at the end of the computation. My test case looks something like
the following:
String[] lines = ...;
try (StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment()) {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<String> input =
env.fromCollection(Arrays.asList(lines)); // the following includes keyBy().window().aggregate()
DataStream<Iterable<Tuple2<String, Long>>> output = TopWords.getTopWords(input, 5); // sink that collects outputs
to (static) list SinkFunction<Iterable<Tuple2<String, Long>>> sink = TestSink.create(); output.addSink(sink);
env.execute(); List<Tuple2<String, Long>> output =
TestSink.outputs(sink).stream().flatMap(Streams::stream).collect(Collectors.toList());
// output is empty }
When I remove any window() and aggregations I can see outputs at the
sink. I tried create manual trigger for the window, I can see
onElement() methods called, but onEventTime() is never called. Also the
FromElementsFunction never explicitly emits max watermark at the end
reading the input. Seems that Flink runtime also does not emit the final
watermark (I cannot see any traces of any watermark emission in the
logs). Seems that I'm doing something obviously wrong, I just cannot
figure out how to emit the final watermark. Adding autoWatermarkInterval
did not help either.
The test seems to behave the same for Flink versions 1.16.3 and 1.18.1.
Thanks in advance for any pointers.
Jan