z3d1k commented on code in PR #138:
URL: 
https://github.com/apache/flink-connector-aws/pull/138#discussion_r1571233482


##########
flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##########
@@ -312,6 +316,148 @@ public void testListStateChangedAfterSnapshotState() 
throws Exception {
         }
     }
 
+    @Test
+    public void testSnapshotStateDuringStopWithSavepoint() throws Exception {
+
+        // 
----------------------------------------------------------------------
+        // setup config, initial state and expected state snapshot
+        // 
----------------------------------------------------------------------
+        Properties config = TestUtils.getStandardProperties();
+
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = 
new ArrayList<>(1);
+        initialState.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                
.generateFromShardOrder(0)))),
+                        new SequenceNumber("11")));
+
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> 
expectedStateSnapshot1 =
+                new ArrayList<>(1);
+        expectedStateSnapshot1.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                
.generateFromShardOrder(0)))),
+                        new SequenceNumber("12")));
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> 
expectedStateSnapshot2 =
+                new ArrayList<>(1);
+        expectedStateSnapshot2.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                
.generateFromShardOrder(0)))),
+                        new SequenceNumber("13")));
+
+        // 
----------------------------------------------------------------------
+        // mock operator state backend and initial state for initializeState()
+        // 
----------------------------------------------------------------------
+
+        TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> 
listState =
+                new TestingListState<>();
+        for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) 
{
+            listState.add(state);
+        }
+
+        OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);

Review Comment:
   @vahmed-hamdy existing test util currently does not support 
`getUnionListState`. As pointed by @dannycranmer, this connector is on 
deprecation path.
   
   Split test into separate cases and extracted some of the common logic to 
reduce size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to