This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new de4a1b9 [FLINK-11523] Use OutputTag.typeInfo in harness de4a1b9 is described below commit de4a1b9a70c170f737283f642eea851d262bbd59 Author: Alexey Trenikhin <alexey.trenik...@genesys.com> AuthorDate: Mon Apr 6 19:34:23 2020 -0700 [FLINK-11523] Use OutputTag.typeInfo in harness AbstractStreamOperatorTestHarness side output uses TypeSerializer returned by `outputTag.getTypeInfo().createSerializer(executionConfig)`. This closes #11695 --- .../util/AbstractStreamOperatorTestHarness.java | 2 +- .../AbstractStreamOperatorTestHarnessTest.java | 48 ++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 294388b..8141126 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -732,7 +732,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { @Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { - sideOutputSerializer = TypeExtractor.getForObject(record.getValue()).createSerializer(executionConfig); + sideOutputSerializer = outputTag.getTypeInfo().createSerializer(executionConfig); ConcurrentLinkedQueue<Object> sideOutputList = sideOutputLists.get(outputTag); if (sideOutputList == null) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java index 719adc1..fd3ae1f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java @@ -18,16 +18,24 @@ package org.apache.flink.streaming.util; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -36,6 +44,12 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import static org.hamcrest.CoreMatchers.containsString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link AbstractStreamOperatorTestHarness}. @@ -91,4 +105,38 @@ public class AbstractStreamOperatorTestHarnessTest extends TestLogger { Assert.assertNull(state.value()); } } + + @Test + public void testSideOutputTypeInformation() throws Throwable { + final int probe = 12; + final TypeSerializer<Integer> typeSerializer = spy(TypeSerializer.class); + + final TypeInformation<Integer> typeInformation = spy(Types.INT); + when(typeInformation.createSerializer(any(ExecutionConfig.class))).thenReturn(typeSerializer); + + final OutputTag<Integer> outputTag = new OutputTag<>("test", typeInformation); + final SideOutputTypeInformationTestFunction testFunction = new SideOutputTypeInformationTestFunction(outputTag); + final OneInputStreamOperatorTestHarness<Integer, Integer> result = new OneInputStreamOperatorTestHarness<>( + new ProcessOperator<>(testFunction)); + result.setup(); + result.open(); + result.processElement(probe, 1000); + + // verify that AbstractStreamOperatorTestHarness called copy on serializer from OutputTag + verify(typeSerializer, times(1)).copy(eq(probe)); + } + + private static class SideOutputTypeInformationTestFunction extends ProcessFunction<Integer, Integer> { + private final OutputTag<Integer> outputTag; + + SideOutputTypeInformationTestFunction(OutputTag<Integer> outputTag) { + this.outputTag = outputTag; + } + + @Override + public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception { + ctx.output(outputTag, value); + } + } } +