Hi Arpith,

looking at the definition of the GetMetadataAsyncProcess function you need
to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>. What
you could try in order to not create the serializer manually is to use:

TypeInformation.of(new TypeHint<Tuple1<Map<String,
List<String>>>>(){}).createSerializer(new ExecutionConfig())

This should hopefully create the correct serializer.

Cheers,
Till



On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <arpithte...@gmail.com> wrote:

> Hi,
>
> I tried mocking the Async operator which takes Tuple1, Tuple3 as Input &
> Output but while creating a test harness I couldn't find the right
> TupleSerializer. Can anyone help me on this?
>
>
> public class GetMetadataAsyncProcess extends 
> RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, 
> List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
>
> ...
>
> }
>
>
> private OneInputStreamOperatorTestHarness 
> createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int 
> capacity, AsyncDataStream.OutputMode mode) throws Exception {
>    OneInputStreamOperatorTestHarness 
> tuple1OUTOneInputStreamOperatorTestHarness = new 
> OneInputStreamOperatorTestHarness<>
>       (new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), 
> <TYPESERIALISER_TO_FILL?>);
>    return tuple1OUTOneInputStreamOperatorTestHarness;
> }
>
>
> Creating a harness without passing TypeSerializer results in the following
> error.
>
> java.lang.NullPointerException
>       at java.util.Objects.requireNonNull(Objects.java:203)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
>       at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
>       at 
> GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
>       
>

Reply via email to