Hey guys,
i Incurred in situation and i need you help.

im trying Using unit test inorder to check my results,
first my  timeWindow is set for 15sec, but the assertyEquals doesnt wait
for the window getting the answer,

so everything is telling me index 0 out of bounds (cuze its didnt get to
place my object in the list yet)

thank you all!

 import org.apache.flink.annotation.Public;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.com.CameraEvent;
import org.com.StreamingJob;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class IntergrationTest extends AbstractTestBase {



    @Test
    public void test() throws Exception {

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(1);

        // values are collected in a static variable
        CollectSink.values.clear();
        LinkedList<CameraEvent> events = GenerateEvents();
        env.fromCollection(events)
                .keyBy(new StreamingJob.GetKey())
                .timeWindow(Time.seconds(10))
                .minBy("dateTime")
                .addSink(new CollectSink());

        env.execute("lala");
               assertEquals(events.get(1), CollectSink.values.get(0));
    }

   private static LinkedList<CameraEvent> GenerateEvents() {
        LinkedList<CameraEvent> linkedList;
        CameraEvent cameraEvent;
        linkedList = new LinkedList<>();
        for (int i = 0; i < 2; i++) {

            cameraEvent = new CameraEvent("123-123-12", 1, new Date(),
"OUT", "CAR");

            linkedList.add(cameraEvent);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return linkedList;
    }
    private static class CollectSink implements SinkFunction<CameraEvent> {

        // must be static
        public static final List<CameraEvent> values = new ArrayList<>();

        @Override
        public synchronized void invoke(CameraEvent value) throws Exception {
            values.add(value);
        }
    }


}

Reply via email to