[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848851#comment-17848851 ] Vadim Vararu commented on FLINK-35115: -- Hi guys, Any idea when the aws-connector-4.3.0 is going to be released? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838934#comment-17838934 ] Danny Cranmer commented on FLINK-35115: --- Merged commit [{{8d29147}}|https://github.com/apache/flink-connector-aws/commit/8d29147b9e6c0a7d27399662c6023ad634363764] into apache:main > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838894#comment-17838894 ] Vadim Vararu commented on FLINK-35115: -- Great, thanks for quick fix (y) > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838884#comment-17838884 ] Danny Cranmer commented on FLINK-35115: --- [~vadim.vararu] yes it will. 4.3.0 will support Flink 1.17 and 1.18 > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854 ] Vadim Vararu commented on FLINK-35115: -- [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838853#comment-17838853 ] Vadim Vararu commented on FLINK-35115: -- [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838358#comment-17838358 ] Aleksandr Pilipenko commented on FLINK-35115: - snapshotState method in FlinkKinesisConsumer skip saving state if operator had been cancelled: 2024-04-17 14:05:52,645 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - snapshotState() called on closed source; returning null. This leads to state not being updated in state backend during stop-with-savepoint workflow. Created a PR to resolve this. > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838216#comment-17838216 ] Aleksandr Pilipenko commented on FLINK-35115: - Investigating further > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838206#comment-17838206 ] Aleksandr Pilipenko commented on FLINK-35115: - [~vadim.vararu] thank you for information. Savepoint should produce similar log to one produced for checkpoint - mechanism is the same. > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838121#comment-17838121 ] Vadim Vararu commented on FLINK-35115: -- [~a.pilipenko] Yes, I can reproduce this consistently. I've enabled this logger: {code:java} logger.kinesis.name = org.apache.flink.streaming.connectors.kinesis logger.kinesis.level = DEBUG {code} and got these last logs on TM before triggering the stop-with-savepoint (the log at 2024-04-17 14:05:11,753 is the last checkpoint): {code:java} 2024-04-17 14:05:06,330 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:11,753 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Snapshotting state ... 2024-04-17 14:05:11,753 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Snapshotted state, last processed sequence numbers: {StreamShardMetadata{streamName='kinesis-dev-1-20210513-v3-contract-impression', shardId='shardId-', parentShardId='null', adjacentParentShardId='null', startingHashKey='0', endingHashKey='340282366920938463463374607431768211455', startingSequenceNumber='49618213417511572504838906841289148356109207047268990978', endingSequenceNumber='null'}=49646826022549514041791139259235973731492142339223191554}, checkpoint id: 1, timestamp: 1713351911711 2024-04-17 14:05:16,652 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:26,930 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:27,032 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - stream: kinesis-dev-1-20210513-v3-contract-impression, shard: shardId-, millis behind latest: 0, batch size: 120 24-04-17 14:05:37,229 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:43,079 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - stream: kinesis-dev-1-20210513-v3-contract-impression, shard: shardId-, millis behind latest: 0, batch size: 1 2024-04-17 14:05:47,752 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 is trying to discover new shards that were created due to resharding ... 2024-04-17 14:05:50,677 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - stream: kinesis-dev-1-20210513-v3-contract-impression, shard: shardId-, millis behind latest: 0, batch size: 1{code} now I trigger the stop-with-savepoint: {code:java} 2024-04-17 14:05:52,168 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 ... 2024-04-17 14:05:52,169 DEBUG org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Cancelled discovery 2024-04-17 14:05:52,169 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Shutting down the shard consumer threads of subtask 0 ... 2024-04-17 14:05:52,645 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - snapshotState() called on closed source; returning null. 2024-04-17 14:05:52,669 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 ... 2024-04-17 14:05:52,670 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Shutting down the shard consumer threads of subtask 0 ... {code} and here I start from the savepoint: {code:java} 2024-04-17 14:12:56,691 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata$EquivalenceWrapper@f5191c51=49646826022549514041791139259235973731492142339223191554} 2024-04-17 14:12:58,370 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 is seeding the fetcher with restored shard StreamShardHandle{streamName='kinesis-dev-1-20210513-v3-contract-impression', shard='{ShardId: shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49618213417511572504838906841289148356109207047268990978,}}'}, starting
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837562#comment-17837562 ] Aleksandr Pilipenko commented on FLINK-35115: - [~vadim.vararu] are you able to reproduce this issue consistently? Can you enable debug logs and share these logs here? Kinesis connectors logs information about sequence numbers it is storing during checkpointing, which may shed some light on the issue. > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837533#comment-17837533 ] Vadim Vararu commented on FLINK-35115: -- FYI it's reproducible in Kinesis -> Flink -> S3 sink also. > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837327#comment-17837327 ] Aleksandr Pilipenko commented on FLINK-35115: - [~dannycranmer] Agree, trying to reproduce it now > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837319#comment-17837319 ] Hong Liang Teoh commented on FLINK-35115: - [~dannycranmer] That is true, unless it is a problem with 2PC on the sink. The main difference for stop-with-savepoint is on the side effects executed on the sink. Let's try and replicate this first! > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837318#comment-17837318 ] Danny Cranmer commented on FLINK-35115: --- Stop with savepoint checkpointing logic is no different to regular checkpoints. This would be a more fundamental issue than limited to stop-with-savepoint. > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837315#comment-17837315 ] Hong Liang Teoh commented on FLINK-35115: - [~a.pilipenko] Assigned to you, as you mentioned you are looking into it > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)