Hey guys, i Incurred in situation and i need you help. im trying Using unit test inorder to check my results, first my timeWindow is set for 15sec, but the assertyEquals doesnt wait for the window getting the answer,
so everything is telling me index 0 out of bounds (cuze its didnt get to place my object in the list yet) thank you all! import org.apache.flink.annotation.Public; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.test.util.AbstractTestBase; import org.com.CameraEvent; import org.com.StreamingJob; import org.junit.Test; import java.util.ArrayList; import java.util.Date; import java.util.LinkedList; import java.util.List; import static org.junit.Assert.assertEquals; public class IntergrationTest extends AbstractTestBase { @Test public void test() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // configure your test environment env.setParallelism(1); // values are collected in a static variable CollectSink.values.clear(); LinkedList<CameraEvent> events = GenerateEvents(); env.fromCollection(events) .keyBy(new StreamingJob.GetKey()) .timeWindow(Time.seconds(10)) .minBy("dateTime") .addSink(new CollectSink()); env.execute("lala"); assertEquals(events.get(1), CollectSink.values.get(0)); } private static LinkedList<CameraEvent> GenerateEvents() { LinkedList<CameraEvent> linkedList; CameraEvent cameraEvent; linkedList = new LinkedList<>(); for (int i = 0; i < 2; i++) { cameraEvent = new CameraEvent("123-123-12", 1, new Date(), "OUT", "CAR"); linkedList.add(cameraEvent); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return linkedList; } private static class CollectSink implements SinkFunction<CameraEvent> { // must be static public static final List<CameraEvent> values = new ArrayList<>(); @Override public synchronized void invoke(CameraEvent value) throws Exception { values.add(value); } } }