Hi,

I tried mocking the Async operator which takes Tuple1, Tuple3 as Input &
Output but while creating a test harness I couldn't find the right
TupleSerializer. Can anyone help me on this?


public class GetMetadataAsyncProcess extends
RichAsyncFunction<Tuple1<Map<String, List<String>>>,
Tuple3<Map<String, List<String>>, Map<String, String>,
List<Map<String, Integer>>>> {

...

}


private OneInputStreamOperatorTestHarness
createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout,
int capacity, AsyncDataStream.OutputMode mode) throws Exception {
   OneInputStreamOperatorTestHarness
tuple1OUTOneInputStreamOperatorTestHarness = new
OneInputStreamOperatorTestHarness<>
      (new AsyncWaitOperator<>(asyncFunction, timeout, capacity,
mode), <TYPESERIALISER_TO_FILL?>);
   return tuple1OUTOneInputStreamOperatorTestHarness;
}


Creating a harness without passing TypeSerializer results in the following
error.

java.lang.NullPointerException
        at java.util.Objects.requireNonNull(Objects.java:203)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
        at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
        at 
GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)

Reply via email to