I would strongly recommend not using the harness for testing user functions.

Instead I'd just create an ITCase like this:

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER =
        new MiniClusterWithClientResource(
                new MiniClusterResourceConfiguration.Builder()
                        .setNumberTaskManagers(1)
                        .setNumberSlotsPerTaskManager(PARALLELISM)
                        .build());

// ------------------------------------------------------------------------

@Test
public void testAsyncFunction() throws Exception {
    final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(PARALLELISM);

    final DataStream<Long> stream = env.fromSequence(1L, 1_000L); //
or fromElements
    stream = AsyncDataStream.orderedWait(stream, ...);

    final List<Long> result = stream.executeAndCollect(10000);
    assertThat(result, containsInAnyOrder(LongStream.rangeClosed(1,
1000).boxed().toArray()));
}

See also
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L62-L62

On Wed, Aug 4, 2021 at 7:00 PM Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> HI
>
> I am trying to use RichAsyncFunction with flink's test harness. My code
> looks like below
>
> final MetadataEnrichment.AsyncFlowLookup fn = new 
> MetadataEnrichment.AsyncFlowLookup();
> final AsyncWaitOperatorFactory<GenericMetric, Tuple2<GenericMetric, 
> EnrichedMinTuple>> operator = new AsyncWaitOperatorFactory<>(fn, 2000, 1, 
> AsyncDataStream.OutputMode.ORDERED);
> final OneInputStreamOperatorTestHarness<GenericMetric, Tuple2<GenericMetric, 
> EnrichedMinTuple>> harness = new 
> OneInputStreamOperatorTestHarness<>(operator);
> Configuration config = new Configuration();
> config.set(StoreOptions.CONFIG_STORE_TYPE, 
> ConfigStoreFactory.StoreType.MEMORY.name());
> harness.getExecutionConfig().setGlobalJobParameters(config);
> harness.getExecutionConfig().registerKryoType(GenericMetric.class);
> harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class);
> harness.open();
>
> But harness.open() is throwing the below exception
>
> java.lang.IllegalStateException
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290)
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280)
>       at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144)
>       at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:72)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:398)
>       at 
> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:180)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:385)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:540)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:428)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:436)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:634)
>
> Can someone suggest what could be going wrong?
>
>
>

Reply via email to