All, Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); CollectSink.values.clear(); // create a stream of custom elements and apply transformations env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) .keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) .addSink(getSink()) env.execute();
I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail. Thanks, Ashish