This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new de4a1b9  [FLINK-11523] Use OutputTag.typeInfo in harness
de4a1b9 is described below

commit de4a1b9a70c170f737283f642eea851d262bbd59
Author: Alexey Trenikhin <alexey.trenik...@genesys.com>
AuthorDate: Mon Apr 6 19:34:23 2020 -0700

    [FLINK-11523] Use OutputTag.typeInfo in harness
    
    AbstractStreamOperatorTestHarness side output uses TypeSerializer returned
    by `outputTag.getTypeInfo().createSerializer(executionConfig)`.
    
    This closes #11695
---
 .../util/AbstractStreamOperatorTestHarness.java    |  2 +-
 .../AbstractStreamOperatorTestHarnessTest.java     | 48 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 294388b..8141126 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -732,7 +732,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
 
                @Override
                public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
record) {
-                       sideOutputSerializer = 
TypeExtractor.getForObject(record.getValue()).createSerializer(executionConfig);
+                       sideOutputSerializer = 
outputTag.getTypeInfo().createSerializer(executionConfig);
 
                        ConcurrentLinkedQueue<Object> sideOutputList = 
sideOutputLists.get(outputTag);
                        if (sideOutputList == null) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
index 719adc1..fd3ae1f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
@@ -18,16 +18,24 @@
 
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -36,6 +44,12 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link AbstractStreamOperatorTestHarness}.
@@ -91,4 +105,38 @@ public class AbstractStreamOperatorTestHarnessTest extends 
TestLogger {
                        Assert.assertNull(state.value());
                }
        }
+
+       @Test
+       public void testSideOutputTypeInformation() throws Throwable {
+               final int probe = 12;
+               final TypeSerializer<Integer> typeSerializer = 
spy(TypeSerializer.class);
+
+               final TypeInformation<Integer> typeInformation = spy(Types.INT);
+               
when(typeInformation.createSerializer(any(ExecutionConfig.class))).thenReturn(typeSerializer);
+
+               final OutputTag<Integer> outputTag = new OutputTag<>("test", 
typeInformation);
+               final SideOutputTypeInformationTestFunction testFunction = new 
SideOutputTypeInformationTestFunction(outputTag);
+               final OneInputStreamOperatorTestHarness<Integer, Integer> 
result = new OneInputStreamOperatorTestHarness<>(
+                       new ProcessOperator<>(testFunction));
+               result.setup();
+               result.open();
+               result.processElement(probe, 1000);
+
+               // verify that AbstractStreamOperatorTestHarness called copy on 
serializer from OutputTag
+               verify(typeSerializer, times(1)).copy(eq(probe));
+       }
+
+       private static class SideOutputTypeInformationTestFunction extends 
ProcessFunction<Integer, Integer> {
+               private final OutputTag<Integer> outputTag;
+
+               SideOutputTypeInformationTestFunction(OutputTag<Integer> 
outputTag) {
+                       this.outputTag = outputTag;
+               }
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<Integer> out) throws Exception {
+                       ctx.output(outputTag, value);
+               }
+       }
 }
+

Reply via email to