Hi, I'm afraid there is no way of making this work with the current implementation. Especially getting this to work in a distributed setting seems hard.
I'm very open for suggestions on this topic, though. :-) Cheers, Aljoscha On Mon, 23 Jan 2017 at 23:19 Steven Ruppert <ste...@fullcontact.com> wrote: > Hi, > > I'm attempting to unit test link with the flink-test-utils support, on > flink 1.1.4. I've got basic flatMap stuff flowing through just fine, > but when running any processing time-based windowing functions, > `env.execute()` will return before any values are flushed out of the > windows. > > import org.apache.flink.api.java.tuple.Tuple2; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.SinkFunction; > import > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.util.TestStreamEnvironment; > import org.junit.Test; > > import java.util.concurrent.atomic.AtomicBoolean; > > import static org.junit.Assert.assertTrue; > > public class TestMinimal { > static AtomicBoolean sinked = new AtomicBoolean(false); > @Test > public void testThing() throws Exception { > StreamExecutionEnvironment env = > TestStreamEnvironment.getExecutionEnvironment(); > > env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 1)) > .keyBy(0) > .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) > .sum(1) > .addSink(new SinkFunction<Tuple2<String, Integer>>() { > @Override > public void invoke(Tuple2<String, Integer> value) > throws Exception { > sinked.set(true); > } > }); > env.execute(); > // presumably once execute returns, all elements have passed > through all operators. > assertTrue(sinked.get()); > } > } > > Is there a way to make this test pass? > > Using event time windows instead does seem to work, but processing > time would be a little more convenient. > > -- > *CONFIDENTIALITY NOTICE: This email message, and any documents, files or > previous e-mail messages attached to it is for the sole use of the intended > recipient(s) and may contain confidential and privileged information. Any > unauthorized review, use, disclosure or distribution is prohibited. If you > are not the intended recipient, please contact the sender by reply email > and destroy all copies of the original message.* >