[FLINK-6808] Implement snapshotConfiguration/ensureCompatibility for 
CoGroupedStreams.UnionSerializer

This closes #4052.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/539787b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/539787b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/539787b2

Branch: refs/heads/master
Commit: 539787b21822eb839d0408a989cd541450bd08d2
Parents: 4895472
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Fri Jun 2 15:15:32 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Wed Jun 7 18:28:58 2017 +0200

----------------------------------------------------------------------
 .../api/datastream/CoGroupedStreams.java        | 60 ++++++++++++++++++--
 flink-tests/pom.xml                             |  1 +
 .../streaming/runtime/CoGroupJoinITCase.java    | 47 +++++++++++++++
 3 files changed, 103 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/539787b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index d112260..8dad1cb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -25,10 +25,15 @@ import 
org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -77,7 +82,7 @@ public class CoGroupedStreams<T1, T2> {
        private final DataStream<T2> input2;
 
        /**
-        * Creates new CoGroped data streams, which are the first step towards 
building a streaming
+        * Creates new CoGrouped data streams, which are the first step towards 
building a streaming
         * co-group.
         *
         * @param input1 The first data stream.
@@ -443,8 +448,7 @@ public class CoGroupedStreams<T1, T2> {
                private final TypeSerializer<T1> oneSerializer;
                private final TypeSerializer<T2> twoSerializer;
 
-               public UnionSerializer(TypeSerializer<T1> oneSerializer,
-                               TypeSerializer<T2> twoSerializer) {
+               public UnionSerializer(TypeSerializer<T1> oneSerializer, 
TypeSerializer<T2> twoSerializer) {
                        this.oneSerializer = oneSerializer;
                        this.twoSerializer = twoSerializer;
                }
@@ -553,12 +557,58 @@ public class CoGroupedStreams<T1, T2> {
 
                @Override
                public TypeSerializerConfigSnapshot snapshotConfiguration() {
-                       throw new UnsupportedOperationException("This 
serializer is not registered for managed state.");
+                       return new 
UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer);
                }
 
                @Override
                public CompatibilityResult<TaggedUnion<T1, T2>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-                       throw new UnsupportedOperationException("This 
serializer is not registered for managed state.");
+                       if (configSnapshot instanceof 
UnionSerializerConfigSnapshot) {
+                               List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
+                                       ((UnionSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
+
+                               CompatibilityResult<T1> 
oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+                                       previousSerializersAndConfigs.get(0).f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       previousSerializersAndConfigs.get(0).f1,
+                                       oneSerializer);
+
+                               CompatibilityResult<T2> 
twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+                                       previousSerializersAndConfigs.get(1).f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       previousSerializersAndConfigs.get(1).f1,
+                                       twoSerializer);
+
+                               if 
(!oneSerializerCompatResult.isRequiresMigration() && 
!twoSerializerCompatResult.isRequiresMigration()) {
+                                       return CompatibilityResult.compatible();
+                               } else if 
(oneSerializerCompatResult.getConvertDeserializer() != null && 
twoSerializerCompatResult.getConvertDeserializer() != null) {
+                                       return 
CompatibilityResult.requiresMigration(
+                                               new UnionSerializer<>(
+                                                       new 
TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()),
+                                                       new 
TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer())));
+                               }
+                       }
+
+                       return CompatibilityResult.requiresMigration();
+               }
+       }
+
+       /**
+        * The {@link TypeSerializerConfigSnapshot} for the {@link 
UnionSerializer}.
+        */
+       public static class UnionSerializerConfigSnapshot<T1, T2> extends 
CompositeTypeSerializerConfigSnapshot {
+
+               private static final int VERSION = 1;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public UnionSerializerConfigSnapshot() {}
+
+               public UnionSerializerConfigSnapshot(TypeSerializer<T1> 
oneSerializer, TypeSerializer<T2> twoSerializer) {
+                       super(oneSerializer, twoSerializer);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/539787b2/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 3c0b184..dd6e949 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -115,6 +115,7 @@ under the License.
                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
+                       <type>test-jar</type>
                </dependency>
 
                <dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/539787b2/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
index da3de3d..a82b965 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
@@ -19,18 +19,24 @@ package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
@@ -324,6 +330,47 @@ public class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
                Assert.assertEquals(expectedResult, testResults);
        }
 
+       /**
+        * Verifies that pipelines including {@link CoGroupedStreams} can be 
checkpointed properly,
+        * which includes snapshotting configurations of any involved 
serializers.
+        *
+        * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-6808";>FLINK-6808</a>
+        */
+       @Test
+       public void testCoGroupOperatorWithCheckpoint() throws Exception {
+
+               // generate an operator for the co-group operation
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+               env.setParallelism(1);
+
+               DataStream<Tuple2<String, Integer>> source1 = 
env.fromElements(Tuple2.of("a", 0), Tuple2.of("b", 3));
+               DataStream<Tuple2<String, Integer>> source2 = 
env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 6));
+
+               DataStream<String> coGroupWindow = source1.coGroup(source2)
+                       .where(new Tuple2KeyExtractor())
+                       .equalTo(new Tuple2KeyExtractor())
+                       .window(TumblingEventTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+                       .apply(new CoGroupFunction<Tuple2<String,Integer>, 
Tuple2<String,Integer>, String>() {
+                               @Override
+                               public void coGroup(Iterable<Tuple2<String, 
Integer>> first,
+                                                                       
Iterable<Tuple2<String, Integer>> second,
+                                                                       
Collector<String> out) throws Exception {
+                                       out.collect(first + ":" + second);
+                               }
+                       });
+
+               OneInputTransformation<Tuple2<String, Integer>, String> 
transform = (OneInputTransformation<Tuple2<String, Integer>, String>) 
coGroupWindow.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, String> 
operator = transform.getOperator();
+
+               // wrap the operator in the test harness, and perform a snapshot
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
String> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new Tuple2KeyExtractor(), BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.open();
+               testHarness.snapshot(0L, 0L);
+       }
+
        private static class Tuple2TimestampExtractor implements 
AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
                
                @Override

Reply via email to