vahmed-hamdy commented on code in PR #138:
URL: 
https://github.com/apache/flink-connector-aws/pull/138#discussion_r1570201177


##########
flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##########
@@ -312,6 +316,148 @@ public void testListStateChangedAfterSnapshotState() 
throws Exception {
         }
     }
 
+    @Test
+    public void testSnapshotStateDuringStopWithSavepoint() throws Exception {
+
+        // 
----------------------------------------------------------------------
+        // setup config, initial state and expected state snapshot
+        // 
----------------------------------------------------------------------
+        Properties config = TestUtils.getStandardProperties();
+
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = 
new ArrayList<>(1);
+        initialState.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                
.generateFromShardOrder(0)))),
+                        new SequenceNumber("11")));
+
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> 
expectedStateSnapshot1 =
+                new ArrayList<>(1);
+        expectedStateSnapshot1.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                
.generateFromShardOrder(0)))),
+                        new SequenceNumber("12")));
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> 
expectedStateSnapshot2 =
+                new ArrayList<>(1);
+        expectedStateSnapshot2.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                
.generateFromShardOrder(0)))),
+                        new SequenceNumber("13")));
+
+        // 
----------------------------------------------------------------------
+        // mock operator state backend and initial state for initializeState()
+        // 
----------------------------------------------------------------------
+
+        TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> 
listState =
+                new TestingListState<>();
+        for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) 
{
+            listState.add(state);
+        }
+
+        OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);

Review Comment:
   It is against [coding standards to use 
mockito](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations),
 aware that the standard is already broken in this test suite but I believe we 
shouldn't add more debt.
   Can we try using an [existing test util 
instead](https://github.com/apache/flink/blob/43a3d50ce3982b9abf04b81407fed46c5c25f819/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java#L34)
   
   Also we can extend the existing `TestableFlinkKinesisConsumer` hierarchy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to