z3d1k commented on code in PR #138: URL: https://github.com/apache/flink-connector-aws/pull/138#discussion_r1571233482
########## flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java: ########## @@ -312,6 +316,148 @@ public void testListStateChangedAfterSnapshotState() throws Exception { } } + @Test + public void testSnapshotStateDuringStopWithSavepoint() throws Exception { + + // ---------------------------------------------------------------------- + // setup config, initial state and expected state snapshot + // ---------------------------------------------------------------------- + Properties config = TestUtils.getStandardProperties(); + + ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = new ArrayList<>(1); + initialState.add( + Tuple2.of( + KinesisDataFetcher.convertToStreamShardMetadata( + new StreamShardHandle( + "fakeStream1", + new Shard() + .withShardId( + KinesisShardIdGenerator + .generateFromShardOrder(0)))), + new SequenceNumber("11"))); + + ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot1 = + new ArrayList<>(1); + expectedStateSnapshot1.add( + Tuple2.of( + KinesisDataFetcher.convertToStreamShardMetadata( + new StreamShardHandle( + "fakeStream1", + new Shard() + .withShardId( + KinesisShardIdGenerator + .generateFromShardOrder(0)))), + new SequenceNumber("12"))); + ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot2 = + new ArrayList<>(1); + expectedStateSnapshot2.add( + Tuple2.of( + KinesisDataFetcher.convertToStreamShardMetadata( + new StreamShardHandle( + "fakeStream1", + new Shard() + .withShardId( + KinesisShardIdGenerator + .generateFromShardOrder(0)))), + new SequenceNumber("13"))); + + // ---------------------------------------------------------------------- + // mock operator state backend and initial state for initializeState() + // ---------------------------------------------------------------------- + + TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = + new TestingListState<>(); + for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) { + listState.add(state); + } + + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); Review Comment: @vahmed-hamdy existing test util currently does not support `getUnionListState`. As pointed by @dannycranmer, this connector is on deprecation path. Split test into separate cases and extracted some of the common logic to reduce size. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org