dannycranmer commented on code in PR #138:
URL: 
https://github.com/apache/flink-connector-aws/pull/138#discussion_r1570265098


##########
flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##########
@@ -312,6 +316,148 @@ public void testListStateChangedAfterSnapshotState() 
throws Exception {
         }
     }
 
+    @Test
+    public void testSnapshotStateDuringStopWithSavepoint() throws Exception {
+
+        // 
----------------------------------------------------------------------
+        // setup config, initial state and expected state snapshot
+        // 
----------------------------------------------------------------------
+        Properties config = TestUtils.getStandardProperties();
+
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = 
new ArrayList<>(1);
+        initialState.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                
.generateFromShardOrder(0)))),
+                        new SequenceNumber("11")));
+
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> 
expectedStateSnapshot1 =
+                new ArrayList<>(1);
+        expectedStateSnapshot1.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                
.generateFromShardOrder(0)))),
+                        new SequenceNumber("12")));
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> 
expectedStateSnapshot2 =
+                new ArrayList<>(1);
+        expectedStateSnapshot2.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                
.generateFromShardOrder(0)))),
+                        new SequenceNumber("13")));
+
+        // 
----------------------------------------------------------------------
+        // mock operator state backend and initial state for initializeState()
+        // 
----------------------------------------------------------------------
+
+        TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> 
listState =
+                new TestingListState<>();
+        for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) 
{
+            listState.add(state);
+        }
+
+        OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);

Review Comment:
   Since Mockito is already used in this calss, and this connector is on the 
deprecation path I ok to use it. However, the test is very long and it is hard 
to know what is going on. Can we break out into methods and reduce the 
complexity of the test case for readability.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to