dannycranmer commented on code in PR #138: URL: https://github.com/apache/flink-connector-aws/pull/138#discussion_r1570265098
########## flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java: ########## @@ -312,6 +316,148 @@ public void testListStateChangedAfterSnapshotState() throws Exception { } } + @Test + public void testSnapshotStateDuringStopWithSavepoint() throws Exception { + + // ---------------------------------------------------------------------- + // setup config, initial state and expected state snapshot + // ---------------------------------------------------------------------- + Properties config = TestUtils.getStandardProperties(); + + ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = new ArrayList<>(1); + initialState.add( + Tuple2.of( + KinesisDataFetcher.convertToStreamShardMetadata( + new StreamShardHandle( + "fakeStream1", + new Shard() + .withShardId( + KinesisShardIdGenerator + .generateFromShardOrder(0)))), + new SequenceNumber("11"))); + + ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot1 = + new ArrayList<>(1); + expectedStateSnapshot1.add( + Tuple2.of( + KinesisDataFetcher.convertToStreamShardMetadata( + new StreamShardHandle( + "fakeStream1", + new Shard() + .withShardId( + KinesisShardIdGenerator + .generateFromShardOrder(0)))), + new SequenceNumber("12"))); + ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot2 = + new ArrayList<>(1); + expectedStateSnapshot2.add( + Tuple2.of( + KinesisDataFetcher.convertToStreamShardMetadata( + new StreamShardHandle( + "fakeStream1", + new Shard() + .withShardId( + KinesisShardIdGenerator + .generateFromShardOrder(0)))), + new SequenceNumber("13"))); + + // ---------------------------------------------------------------------- + // mock operator state backend and initial state for initializeState() + // ---------------------------------------------------------------------- + + TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = + new TestingListState<>(); + for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) { + listState.add(state); + } + + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); Review Comment: Since Mockito is already used in this calss, and this connector is on the deprecation path I ok to use it. However, the test is very long and it is hard to know what is going on. Can we break out into methods and reduce the complexity of the test case for readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org