[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838121#comment-17838121 ]
Vadim Vararu commented on FLINK-35115: -------------------------------------- [~a.pilipenko] Yes, I can reproduce this consistently. I've enabled this logger: {code:java} logger.kinesis.name = org.apache.flink.streaming.connectors.kinesis logger.kinesis.level = DEBUG {code} and got these last logs on TM before triggering the stop-with-savepoint (the log at 2024-04-17 14:05:11,753 is the last checkpoint): {code:java} 2024-04-17 14:05:06,330 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:11,753 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Snapshotting state ... 2024-04-17 14:05:11,753 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Snapshotted state, last processed sequence numbers: {StreamShardMetadata{streamName='kinesis-dev-1-20210513-v3-contract-impression', shardId='shardId-000000000000', parentShardId='null', adjacentParentShardId='null', startingHashKey='0', endingHashKey='340282366920938463463374607431768211455', startingSequenceNumber='49618213417511572504838906841289148356109207047268990978', endingSequenceNumber='null'}=49646826022549514041791139259235973731492142339223191554}, checkpoint id: 1, timestamp: 1713351911711 2024-04-17 14:05:16,652 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:26,930 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:27,032 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - stream: kinesis-dev-1-20210513-v3-contract-impression, shard: shardId-000000000000, millis behind latest: 0, batch size: 120 24-04-17 14:05:37,229 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:43,079 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - stream: kinesis-dev-1-20210513-v3-contract-impression, shard: shardId-000000000000, millis behind latest: 0, batch size: 1 2024-04-17 14:05:47,752 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:50,677 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - stream: kinesis-dev-1-20210513-v3-contract-impression, shard: shardId-000000000000, millis behind latest: 0, batch size: 1{code} now I trigger the stop-with-savepoint: {code:java} 2024-04-17 14:05:52,168 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 ... 2024-04-17 14:05:52,169 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Cancelled discovery 2024-04-17 14:05:52,169 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Shutting down the shard consumer threads of subtask 0 ... 2024-04-17 14:05:52,645 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - snapshotState() called on closed source; returning null. 2024-04-17 14:05:52,669 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 ... 2024-04-17 14:05:52,670 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Shutting down the shard consumer threads of subtask 0 ... {code} and here I start from the savepoint: {code:java} 2024-04-17 14:12:56,691 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata$EquivalenceWrapper@f5191c51=49646826022549514041791139259235973731492142339223191554} 2024-04-17 14:12:58,370 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 is seeding the fetcher with restored shard StreamShardHandle{streamName='kinesis-dev-1-20210513-v3-contract-impression', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49618213417511572504838906841289148356109207047268990978,}}'}, starting state set to the restored sequence number 49646826022549514041791139259235973731492142339223191554 2024-04-17 14:12:58,371 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='kinesis-dev-1-20210513-v3-contract-impression', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49618213417511572504838906841289148356109207047268990978,}}'} from sequence number 49646826022549514041791139259235973731492142339223191554 with ShardConsumer 0 2024-04-17 14:12:59,211 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:12:59,297 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - stream: kinesis-dev-1-20210513-v3-contract-impression, shard: shardId-000000000000, millis behind latest: 0, batch size: 3 {code} There are no other logs between the above actions. I did not dive into the source code, but it seems odd to me that these type of line: {code:java} 2024-04-17 14:05:11,753 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Snapshotted state, last processed sequence numbers: {StreamShardMetadata{streamName='kinesis-dev-1-20210513-v3-contract-impression', shardId='shardId-000000000000', parentShardId='null', adjacentParentShardId='null', startingHashKey='0', endingHashKey='340282366920938463463374607431768211455', startingSequenceNumber='49618213417511572504838906841289148356109207047268990978', endingSequenceNumber='null'}=49646826022549514041791139259235973731492142339223191554}, checkpoint id: 1, timestamp: 1713351911711 {code} has been logged at the last checkpoint but there is not anything similar at the stop-with-savepoint. I've let pass only the logs for _org.apache.flink.streaming.connectors.kinesis._ Let me know if you need any other logs. > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > ----------------------------------------------------------------------------- > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis > Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (60000,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (90000); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 480000); > KafkaSink<String> sink = KafkaSink.<String>builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema<String>) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer<ByteBuffer> flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > > Reporter: Vadim Vararu > Assignee: Aleksandr Pilipenko > Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > ............................................................................ > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > ............................................................................ > * Job is resumed from the savepoint > * *{color:#FF0000}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)