Re: How to test window

2018-12-20 Thread Chesnay Schepler
Since you define a 15 second window you have to ensure that your source 
generates at least 15 seconds worth of data; otherwise the window will 
never fire.
Since you do not use event-time your source has to actually run for at 
least 15 seconds; for this case collection sources will simply not work. 
You need a custom SourceFunction that emits your data over a 15 + x 
seconds period.


On 20.12.2018 15:12, עדן שרקון wrote:

Hey guys,
i Incurred in situation and i need you help.

im trying Using unit test inorder to check my results,
first my  timeWindow is set for 15sec, but the assertyEquals doesnt 
wait for the window getting the answer,


so everything is telling me index 0 out of bounds (cuze its didnt get 
to place my object in the list yet)


thank you all!

import org.apache.flink.annotation.Public;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.com.CameraEvent;
import org.com.StreamingJob;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class IntergrationTest extends AbstractTestBase {
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// configure your test environment
env.setParallelism(1);
// values are collected in a static variable
CollectSink.values.clear();
LinkedList events = GenerateEvents();
env.fromCollection(events)
.keyBy(new StreamingJob.GetKey())
.timeWindow(Time.seconds(10))
.minBy("dateTime")
.addSink(new CollectSink());
env.execute("lala");
assertEquals(events.get(1), CollectSink.values.get(0));
}
private static LinkedList GenerateEvents() {
LinkedList linkedList;
CameraEvent cameraEvent;
linkedList = new LinkedList<>();
for (int i = 0; i < 2; i++) {
cameraEvent = new CameraEvent("123-123-12", 1, new Date(), "OUT", "CAR");
linkedList.add(cameraEvent);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return linkedList;
}
private static class CollectSink implements SinkFunction {
// must be static
public static final List values = new ArrayList<>();
@Override
public synchronized void invoke(CameraEvent value) throws Exception {
values.add(value);
}
}
}





How to test window

2018-12-20 Thread עדן שרקון
Hey guys,
i Incurred in situation and i need you help.

im trying Using unit test inorder to check my results,
first my  timeWindow is set for 15sec, but the assertyEquals doesnt wait
for the window getting the answer,

so everything is telling me index 0 out of bounds (cuze its didnt get to
place my object in the list yet)

thank you all!

 import org.apache.flink.annotation.Public;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.com.CameraEvent;
import org.com.StreamingJob;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class IntergrationTest extends AbstractTestBase {



@Test
public void test() throws Exception {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

// configure your test environment
env.setParallelism(1);

// values are collected in a static variable
CollectSink.values.clear();
LinkedList events = GenerateEvents();
env.fromCollection(events)
.keyBy(new StreamingJob.GetKey())
.timeWindow(Time.seconds(10))
.minBy("dateTime")
.addSink(new CollectSink());

env.execute("lala");
   assertEquals(events.get(1), CollectSink.values.get(0));
}

   private static LinkedList GenerateEvents() {
LinkedList linkedList;
CameraEvent cameraEvent;
linkedList = new LinkedList<>();
for (int i = 0; i < 2; i++) {

cameraEvent = new CameraEvent("123-123-12", 1, new Date(),
"OUT", "CAR");

linkedList.add(cameraEvent);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return linkedList;
}
private static class CollectSink implements SinkFunction {

// must be static
public static final List values = new ArrayList<>();

@Override
public synchronized void invoke(CameraEvent value) throws Exception {
values.add(value);
}
}


}