Hi Tovi,

What you need is the TwoInputStreamOperatorTestHarness. This will allow you to 
do something like:

TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
      new TwoInputStreamOperatorTestHarness<>(myoperator);

testHarness.setup();
testHarness.open();

testHarness.processWatermark1(new Watermark(17));
testHarness.processWatermark2(new Watermark(17));
testHarness.processElement1(new StreamRecord<>(5, 12L));

testHarness.processWatermark1(new Watermark(42));
testHarness.processWatermark2(new Watermark(42));
testHarness.processElement2(new StreamRecord<>("6", 13L));

and then use testHarness.getOutput() to get the output and compare it against 
the expected one.

If you have access to the Flink source code, I would recommend you to have a 
look at the CoProcessOperatorTest for an example.

Or you can find it here: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
 
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java>

Hope this helps,
Kostas


> On Dec 7, 2017, at 5:54 PM, Sofer, Tovi <tovi.so...@citi.com> wrote:
> 
> Hi group,
> 
>  
> 
> What is the best practice for testing CoFlatMap operator correctness?
> 
> We have two source functions, each emits data to stream, and a connect 
> between them, and I want to make sure that when streamA element arrive before 
> stream element, a certain behavior happens.
> 
> How can I test this case?
> 
> Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> and emitting timestamp and watermark per element didn’t help, and still each 
> element arrive in unexpected order.
>  
> Thanks in advance,
> Tovi

Reply via email to