Hi, I tried mocking the Async operator which takes Tuple1, Tuple3 as Input & Output but while creating a test harness I couldn't find the right TupleSerializer. Can anyone help me on this?
public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> { ... } private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception { OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<> (new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>); return tuple1OUTOneInputStreamOperatorTestHarness; } Creating a harness without passing TypeSerializer results in the following error. java.lang.NullPointerException at java.util.Objects.requireNonNull(Objects.java:203) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479) at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)