This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new b996d1d  [FLINK-24943][Connectors / Kinesis] Explicitly create 
KryoSerializer for SequenceNumber class in Kinesis Consumer
b996d1d is described below

commit b996d1d854a221787cfbb46802b9f3c5bb0e6651
Author: Alexander Egorov <cybern...@gmail.com>
AuthorDate: Mon Sep 11 06:10:27 2023 -0500

    [FLINK-24943][Connectors / Kinesis] Explicitly create KryoSerializer for 
SequenceNumber class in Kinesis Consumer
---
 .../connectors/kinesis/FlinkKinesisConsumer.java   | 10 ++--
 .../connectors/kinesis/util/KinesisStateUtil.java  | 56 ++++++++++++++++++++
 .../kinesis/FlinkKinesisConsumerTest.java          | 61 ++++++++++++++++++++--
 .../testutils/TestableFlinkKinesisConsumer.java    | 29 ++--------
 4 files changed, 120 insertions(+), 36 deletions(-)

diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 3647176..c229a1c 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -47,6 +46,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeseri
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import 
org.apache.flink.streaming.connectors.kinesis.table.DefaultShardAssignerFactory;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisStateUtil;
 import 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.util.InstantiationUtil;
@@ -441,16 +441,14 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T>
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws 
Exception {
-        TypeInformation<Tuple2<StreamShardMetadata, SequenceNumber>> 
shardsStateTypeInfo =
-                new TupleTypeInfo<>(
-                        TypeInformation.of(StreamShardMetadata.class),
-                        TypeInformation.of(SequenceNumber.class));
 
         sequenceNumsStateForCheckpoint =
                 context.getOperatorStateStore()
                         .getUnionListState(
                                 new ListStateDescriptor<>(
-                                        sequenceNumsStateStoreName, 
shardsStateTypeInfo));
+                                        sequenceNumsStateStoreName,
+                                        
KinesisStateUtil.createShardsStateSerializer(
+                                                
getRuntimeContext().getExecutionConfig())));
 
         if (context.isRestored()) {
             if (sequenceNumsToRestore == null) {
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java
new file mode 100644
index 0000000..eba4440
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
+
+/** Utilities for Flink Kinesis connector state management. */
+public class KinesisStateUtil {
+
+    /** To prevent instantiation of class. */
+    private KinesisStateUtil() {}
+
+    /**
+     * Creates state serializer for kinesis shard sequence number. Using of 
the explicit state
+     * serializer with KryoSerializer is needed because otherwise users cannot 
use
+     * 'disableGenericTypes' properties with KinesisConsumer, see FLINK-24943 
for details
+     *
+     * @return state serializer
+     */
+    public static TupleSerializer<Tuple2<StreamShardMetadata, SequenceNumber>>
+            createShardsStateSerializer(ExecutionConfig executionConfig) {
+        // explicit serializer will keep the compatibility with 
GenericTypeInformation
+        // and allow to disableGenericTypes for users
+        TypeSerializer<?>[] fieldSerializers =
+                new TypeSerializer<?>[] {
+                    
TypeInformation.of(StreamShardMetadata.class).createSerializer(executionConfig),
+                    new KryoSerializer<>(SequenceNumber.class, executionConfig)
+                };
+        @SuppressWarnings("unchecked")
+        Class<Tuple2<StreamShardMetadata, SequenceNumber>> tupleClass =
+                (Class<Tuple2<StreamShardMetadata, SequenceNumber>>) 
(Class<?>) Tuple2.class;
+        return new TupleSerializer<>(tupleClass, fieldSerializers);
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 7e12c53..d873c4e 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
@@ -24,7 +25,11 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mock.Whitebox;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -51,11 +56,14 @@ import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisStateUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
 import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.CollectingSourceContext;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 import org.apache.flink.types.PojoTestUtils;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
@@ -158,9 +166,7 @@ public class FlinkKinesisConsumerTest extends TestLogger {
 
         FlinkKinesisConsumer<String> consumer =
                 new FlinkKinesisConsumer<>("fakeStream", new 
SimpleStringSchema(), config);
-        RuntimeContext context = mock(RuntimeContext.class);
-        when(context.getIndexOfThisSubtask()).thenReturn(0);
-        when(context.getNumberOfParallelSubtasks()).thenReturn(2);
+        RuntimeContext context = new MockStreamingRuntimeContext(true, 2, 0);
         consumer.setRuntimeContext(context);
 
         OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -278,8 +284,7 @@ public class FlinkKinesisConsumerTest extends TestLogger {
                 new FlinkKinesisConsumer<>("fakeStream", new 
SimpleStringSchema(), config);
         FlinkKinesisConsumer<?> mockedConsumer = spy(consumer);
 
-        RuntimeContext context = mock(RuntimeContext.class);
-        when(context.getIndexOfThisSubtask()).thenReturn(1);
+        RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 1);
 
         mockedConsumer.setRuntimeContext(context);
         mockedConsumer.initializeState(initializationContext);
@@ -307,6 +312,52 @@ public class FlinkKinesisConsumerTest extends TestLogger {
         }
     }
 
+    /**
+     * Before using an explicit TypeSerializer for the state the {@link 
FlinkKinesisConsumer} was
+     * creating a serializer implicitly using a {@link TypeInformation}. After 
fixing issue
+     * FLINK-24943, * serializer is created explicitly. Here, we verify that 
previous approach is
+     * compatible with the new one.
+     */
+    @Test
+    public void testExplicitStateSerializerCompatibility() throws Exception {
+        ExecutionConfig executionConfig = new ExecutionConfig();
+
+        Tuple2<StreamShardMetadata, SequenceNumber> tuple =
+                new Tuple2<>(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                
.generateFromShardOrder(0)))),
+                        new SequenceNumber("1"));
+
+        // This is how serializer was created implicitly using a 
TypeInformation
+        // and since SequenceNumber is GenericType, Flink falls back to Kryo
+        TypeInformation<Tuple2<StreamShardMetadata, SequenceNumber>> 
originalShardsStateTypeInfo =
+                new TupleTypeInfo<>(
+                        TypeInformation.of(StreamShardMetadata.class),
+                        TypeInformation.of(SequenceNumber.class));
+        TypeSerializer<Tuple2<StreamShardMetadata, SequenceNumber>> 
serializerFromTypeInfo =
+                originalShardsStateTypeInfo.createSerializer(executionConfig);
+        byte[] bytes = 
InstantiationUtil.serializeToByteArray(serializerFromTypeInfo, tuple);
+
+        // This is how we create serializer explicitly with Kryo
+        TupleSerializer<Tuple2<StreamShardMetadata, SequenceNumber>> 
serializerFromKryo =
+                KinesisStateUtil.createShardsStateSerializer(executionConfig);
+
+        Tuple2<StreamShardMetadata, SequenceNumber> actualTuple =
+                InstantiationUtil.deserializeFromByteArray(serializerFromKryo, 
bytes);
+
+        // Both ways should be the same
+        assertThat(tuple)
+                .overridingErrorMessage(
+                        "Explicit serializer is not compatible with "
+                                + "implicit method of creating serializer 
using TypeInformation.")
+                .isEqualTo(actualTuple);
+    }
+
     // ----------------------------------------------------------------------
     // Tests related to fetcher initialization
     // ----------------------------------------------------------------------
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
index 5be5f32..fc7aca5 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
@@ -20,10 +20,7 @@ package 
org.apache.flink.streaming.connectors.kinesis.testutils;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 
 import java.util.Properties;
 
@@ -39,27 +36,9 @@ public class TestableFlinkKinesisConsumer extends 
FlinkKinesisConsumer<String> {
             final int indexOfThisConsumerSubtask) {
         super(fakeStream, new SimpleStringSchema(), fakeConfiguration);
 
-        this.mockedRuntimeCtx = Mockito.mock(RuntimeContext.class);
-
-        Mockito.when(mockedRuntimeCtx.getNumberOfParallelSubtasks())
-                .thenAnswer(
-                        new Answer<Integer>() {
-                            @Override
-                            public Integer answer(InvocationOnMock 
invocationOnMock)
-                                    throws Throwable {
-                                return totalNumOfConsumerSubtasks;
-                            }
-                        });
-
-        Mockito.when(mockedRuntimeCtx.getIndexOfThisSubtask())
-                .thenAnswer(
-                        new Answer<Integer>() {
-                            @Override
-                            public Integer answer(InvocationOnMock 
invocationOnMock)
-                                    throws Throwable {
-                                return indexOfThisConsumerSubtask;
-                            }
-                        });
+        this.mockedRuntimeCtx =
+                new MockStreamingRuntimeContext(
+                        true, totalNumOfConsumerSubtasks, 
indexOfThisConsumerSubtask);
     }
 
     @Override

Reply via email to