Hi Arpith, looking at the definition of the GetMetadataAsyncProcess function you need to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>. What you could try in order to not create the serializer manually is to use:
TypeInformation.of(new TypeHint<Tuple1<Map<String, List<String>>>>(){}).createSerializer(new ExecutionConfig()) This should hopefully create the correct serializer. Cheers, Till On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <arpithte...@gmail.com> wrote: > Hi, > > I tried mocking the Async operator which takes Tuple1, Tuple3 as Input & > Output but while creating a test harness I couldn't find the right > TupleSerializer. Can anyone help me on this? > > > public class GetMetadataAsyncProcess extends > RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, > List<String>>, Map<String, String>, List<Map<String, Integer>>>> { > > ... > > } > > > private OneInputStreamOperatorTestHarness > createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int > capacity, AsyncDataStream.OutputMode mode) throws Exception { > OneInputStreamOperatorTestHarness > tuple1OUTOneInputStreamOperatorTestHarness = new > OneInputStreamOperatorTestHarness<> > (new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), > <TYPESERIALISER_TO_FILL?>); > return tuple1OUTOneInputStreamOperatorTestHarness; > } > > > Creating a harness without passing TypeSerializer results in the following > error. > > java.lang.NullPointerException > at java.util.Objects.requireNonNull(Objects.java:203) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479) > at > GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40) > >