[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838894#comment-17838894 ] Vadim Vararu commented on FLINK-35115: -- Great, thanks for quick fix (y) > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854 ] Vadim Vararu edited comment on FLINK-35115 at 4/19/24 4:54 AM: --- [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right? was (Author: JIRAUSER305101): [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854 ] Vadim Vararu commented on FLINK-35115: -- [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115 ] Vadim Vararu deleted comment on FLINK-35115: -- was (Author: JIRAUSER305101): [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838853#comment-17838853 ] Vadim Vararu commented on FLINK-35115: -- [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838121#comment-17838121 ] Vadim Vararu commented on FLINK-35115: -- [~a.pilipenko] Yes, I can reproduce this consistently. I've enabled this logger: {code:java} logger.kinesis.name = org.apache.flink.streaming.connectors.kinesis logger.kinesis.level = DEBUG {code} and got these last logs on TM before triggering the stop-with-savepoint (the log at 2024-04-17 14:05:11,753 is the last checkpoint): {code:java} 2024-04-17 14:05:06,330 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:11,753 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Snapshotting state ... 2024-04-17 14:05:11,753 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Snapshotted state, last processed sequence numbers: {StreamShardMetadata{streamName='kinesis-dev-1-20210513-v3-contract-impression', shardId='shardId-', parentShardId='null', adjacentParentShardId='null', startingHashKey='0', endingHashKey='340282366920938463463374607431768211455', startingSequenceNumber='49618213417511572504838906841289148356109207047268990978', endingSequenceNumber='null'}=49646826022549514041791139259235973731492142339223191554}, checkpoint id: 1, timestamp: 1713351911711 2024-04-17 14:05:16,652 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:26,930 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:27,032 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - stream: kinesis-dev-1-20210513-v3-contract-impression, shard: shardId-, millis behind latest: 0, batch size: 120 24-04-17 14:05:37,229 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:43,079 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - stream: kinesis-dev-1-20210513-v3-contract-impression, shard: shardId-, millis behind latest: 0, batch size: 1 2024-04-17 14:05:47,752 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:50,677 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - stream: kinesis-dev-1-20210513-v3-contract-impression, shard: shardId-, millis behind latest: 0, batch size: 1{code} now I trigger the stop-with-savepoint: {code:java} 2024-04-17 14:05:52,168 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 ... 2024-04-17 14:05:52,169 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Cancelled discovery 2024-04-17 14:05:52,169 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Shutting down the shard consumer threads of subtask 0 ... 2024-04-17 14:05:52,645 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - snapshotState() called on closed source; returning null. 2024-04-17 14:05:52,669 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 ... 2024-04-17 14:05:52,670 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Shutting down the shard consumer threads of subtask 0 ... {code} and here I start from the savepoint: {code:java} 2024-04-17 14:12:56,691 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata$EquivalenceWrapper@f5191c51=49646826022549514041791139259235973731492142339223191554} 2024-04-17 14:12:58,370 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 is seeding the fetcher with restored shard StreamShardHandle{streamName='kinesis-dev-1-20210513-v3-contract-impression', shard='{ShardId: shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49618213417511572504838906841289148356109207047268990978,}}'}, starting
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837533#comment-17837533 ] Vadim Vararu commented on FLINK-35115: -- FYI it's reproducible in Kinesis -> Flink -> S3 sink also. > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vadim Vararu updated FLINK-35115: - Summary: Kinesis connector writes wrong Kinesis sequence number at stop with savepoint (was: Kinesis connector writes wrong sequence number at stop with savepoint) > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35115) Kinesis connector writes wrong sequence number at stop with savepoint
Vadim Vararu created FLINK-35115: Summary: Kinesis connector writes wrong sequence number at stop with savepoint Key: FLINK-35115 URL: https://issues.apache.org/jira/browse/FLINK-35115 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: 1.16.3 Environment: The issue happens in a *Kinesis -> Flink -> Kafka* exactly-once setup with: * Flink versions checked 1.16.3 and 1.18.1 * Kinesis connector checked 1.16.3 and 4.2.0-1.18 * checkpointing configured at 1 minute with EXACTLY_ONCE mode: {code:java} StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment (); execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig ().setCheckpointTimeout (9); execEnv.getCheckpointConfig ().setCheckpointStorage (CHECKPOINTS_PATH); {code} * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: {code:java} Properties sinkConfig = new Properties (); sinkConfig.put ("transaction.timeout.ms", 48); KafkaSink sink = KafkaSink.builder () .setBootstrapServers ("localhost:9092") .setTransactionalIdPrefix ("test-prefix") .setDeliverGuarantee (EXACTLY_ONCE) .setKafkaProducerConfig (sinkConfig) .setRecordSerializer ( (KafkaRecordSerializationSchema) (element, context, timestamp) -> new ProducerRecord<> ( "test-output-topic", null, element.getBytes ())) .build (); {code} * Kinesis consumer defined as: {code:java} FlinkKinesisConsumer flinkKinesisConsumer = new FlinkKinesisConsumer<> ("test-stream", new AbstractDeserializationSchema<> () { @Override public ByteBuffer deserialize (byte[] bytes) { // Return return ByteBuffer.wrap (bytes); } }, props); {code} Reporter: Vadim Vararu Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a stop-with-savepoint, Flink duplicates in Kafka all the records between the last checkpoint and the savepoint at resume: * Event1 is written to Kinesis * Event1 is processed by Flink * Event1 is committed to Kafka at the checkpoint * * Event2 is written to Kinesis * Event2 is processed by Flink * Stop with savepoint is triggered manually * Event2 is committed to Kafka * * Job is resumed from the savepoint * *{color:#FF}Event2 is written again to Kafka at the first checkpoint{color}* {color:#172b4d}I believe that it's a Kinesis connector issue for 2 reasons:{color} * I've checked the actual Kinesis sequence number in the _metadata file generated at stop-with-savepoint and it's the one from the checkpoint before the savepoint instead of being the one of the last record committed to Kafka. * I've tested exactly the save job with Kafka as source instead of Kinesis as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vadim Vararu updated FLINK-35115: - Affects Version/s: 1.18.1 > Kinesis connector writes wrong sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35115) Kinesis connector writes wrong sequence number at stop with savepoint
Vadim Vararu created FLINK-35115: Summary: Kinesis connector writes wrong sequence number at stop with savepoint Key: FLINK-35115 URL: https://issues.apache.org/jira/browse/FLINK-35115 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: 1.16.3 Environment: The issue happens in a *Kinesis -> Flink -> Kafka* exactly-once setup with: * Flink versions checked 1.16.3 and 1.18.1 * Kinesis connector checked 1.16.3 and 4.2.0-1.18 * checkpointing configured at 1 minute with EXACTLY_ONCE mode: {code:java} StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment (); execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig ().setCheckpointTimeout (9); execEnv.getCheckpointConfig ().setCheckpointStorage (CHECKPOINTS_PATH); {code} * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: {code:java} Properties sinkConfig = new Properties (); sinkConfig.put ("transaction.timeout.ms", 48); KafkaSink sink = KafkaSink.builder () .setBootstrapServers ("localhost:9092") .setTransactionalIdPrefix ("test-prefix") .setDeliverGuarantee (EXACTLY_ONCE) .setKafkaProducerConfig (sinkConfig) .setRecordSerializer ( (KafkaRecordSerializationSchema) (element, context, timestamp) -> new ProducerRecord<> ( "test-output-topic", null, element.getBytes ())) .build (); {code} * Kinesis consumer defined as: {code:java} FlinkKinesisConsumer flinkKinesisConsumer = new FlinkKinesisConsumer<> ("test-stream", new AbstractDeserializationSchema<> () { @Override public ByteBuffer deserialize (byte[] bytes) { // Return return ByteBuffer.wrap (bytes); } }, props); {code} Reporter: Vadim Vararu Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a stop-with-savepoint, Flink duplicates in Kafka all the records between the last checkpoint and the savepoint at resume: * Event1 is written to Kinesis * Event1 is processed by Flink * Event1 is committed to Kafka at the checkpoint * * Event2 is written to Kinesis * Event2 is processed by Flink * Stop with savepoint is triggered manually * Event2 is committed to Kafka * * Job is resumed from the savepoint * *{color:#FF}Event2 is written again to Kafka at the first checkpoint{color}* {color:#172b4d}I believe that it's a Kinesis connector issue for 2 reasons:{color} * I've checked the actual Kinesis sequence number in the _metadata file generated at stop-with-savepoint and it's the one from the checkpoint before the savepoint instead of being the one of the last record committed to Kafka. * I've tested exactly the save job with Kafka as source instead of Kinesis as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)