Hi,

I'm attempting to unit test link with the flink-test-utils support, on
flink 1.1.4. I've got basic flatMap stuff flowing through just fine,
but when running any processing time-based windowing functions,
`env.execute()` will return before any values are flushed out of the
windows.

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertTrue;

public class TestMinimal {
    static AtomicBoolean sinked = new AtomicBoolean(false);
    @Test
    public void testThing() throws Exception {
        StreamExecutionEnvironment env =
            TestStreamEnvironment.getExecutionEnvironment();

        env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 1))
            .keyBy(0)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
            .sum(1)
            .addSink(new SinkFunction<Tuple2<String, Integer>>() {
                @Override
                public void invoke(Tuple2<String, Integer> value)
throws Exception {
                    sinked.set(true);
                }
            });
        env.execute();
        // presumably once execute returns, all elements have passed
through all operators.
        assertTrue(sinked.get());
    }
}

Is there a way to make this test pass?

Using event time windows instead does seem to work, but processing
time would be a little more convenient.

-- 
*CONFIDENTIALITY NOTICE: This email message, and any documents, files or 
previous e-mail messages attached to it is for the sole use of the intended 
recipient(s) and may contain confidential and privileged information. Any 
unauthorized review, use, disclosure or distribution is prohibited. If you 
are not the intended recipient, please contact the sender by reply email 
and destroy all copies of the original message.*

Reply via email to