[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-05-23 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848851#comment-17848851
 ] 

Vadim Vararu commented on FLINK-35115:
--

Hi guys,

 

Any idea when the aws-connector-4.3.0 is going to be released?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-19 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838934#comment-17838934
 ] 

Danny Cranmer commented on FLINK-35115:
---

Merged commit 
[{{8d29147}}|https://github.com/apache/flink-connector-aws/commit/8d29147b9e6c0a7d27399662c6023ad634363764]
 into apache:main 

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-19 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838894#comment-17838894
 ] 

Vadim Vararu commented on FLINK-35115:
--

Great, thanks for quick fix (y)

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-19 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838884#comment-17838884
 ] 

Danny Cranmer commented on FLINK-35115:
---

[~vadim.vararu] yes it will. 4.3.0 will support Flink 1.17 and 1.18

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838853#comment-17838853
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838358#comment-17838358
 ] 

Aleksandr Pilipenko commented on FLINK-35115:
-

snapshotState method in FlinkKinesisConsumer skip saving state if operator had 
been cancelled:
2024-04-17 14:05:52,645 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
snapshotState() called on closed source; returning null.
This leads to state not being updated in state backend during 
stop-with-savepoint workflow. Created a PR to resolve this.

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838216#comment-17838216
 ] 

Aleksandr Pilipenko commented on FLINK-35115:
-

Investigating further

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838206#comment-17838206
 ] 

Aleksandr Pilipenko commented on FLINK-35115:
-

[~vadim.vararu] thank you for information. Savepoint should produce similar log 
to one produced for checkpoint - mechanism is the same.

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838121#comment-17838121
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] Yes, I can reproduce this consistently.

 

I've enabled this logger:

 
{code:java}
logger.kinesis.name = org.apache.flink.streaming.connectors.kinesis
logger.kinesis.level = DEBUG {code}
and got these last logs on TM before triggering the stop-with-savepoint (the 
log at 2024-04-17 14:05:11,753 is the last checkpoint):

 

 
{code:java}
2024-04-17 14:05:06,330 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:11,753 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
Snapshotting state ...

2024-04-17 14:05:11,753 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
Snapshotted state, last processed sequence numbers: 
{StreamShardMetadata{streamName='kinesis-dev-1-20210513-v3-contract-impression',
 shardId='shardId-', parentShardId='null', 
adjacentParentShardId='null', startingHashKey='0', 
endingHashKey='340282366920938463463374607431768211455', 
startingSequenceNumber='49618213417511572504838906841289148356109207047268990978',
 
endingSequenceNumber='null'}=49646826022549514041791139259235973731492142339223191554},
 checkpoint id: 1, timestamp: 1713351911711

2024-04-17 14:05:16,652 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:26,930 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:27,032 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
stream: kinesis-dev-1-20210513-v3-contract-impression, shard: 
shardId-, millis behind latest: 0, batch size: 120

24-04-17 14:05:37,229 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:43,079 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
stream: kinesis-dev-1-20210513-v3-contract-impression, shard: 
shardId-, millis behind latest: 0, batch size: 1

2024-04-17 14:05:47,752 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:50,677 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
stream: kinesis-dev-1-20210513-v3-contract-impression, shard: 
shardId-, millis behind latest: 0, batch size: 1{code}
now I trigger the stop-with-savepoint:
{code:java}
2024-04-17 14:05:52,168 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 
...

2024-04-17 14:05:52,169 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Cancelled discovery

2024-04-17 14:05:52,169 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Shutting down the shard consumer threads of subtask 0 ...

2024-04-17 14:05:52,645 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
snapshotState() called on closed source; returning null.

2024-04-17 14:05:52,669 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 
...

2024-04-17 14:05:52,670 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Shutting down the shard consumer threads of subtask 0 ...  {code}
and here I start from the savepoint:
{code:java}
2024-04-17 14:12:56,691 INFO  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Setting 
restore state in the FlinkKinesisConsumer. Using the following offsets: 
{org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata$EquivalenceWrapper@f5191c51=49646826022549514041791139259235973731492142339223191554}

2024-04-17 14:12:58,370 INFO  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 
0 is seeding the fetcher with restored shard 
StreamShardHandle{streamName='kinesis-dev-1-20210513-v3-contract-impression', 
shard='{ShardId: shardId-,HashKeyRange: {StartingHashKey: 
0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49618213417511572504838906841289148356109207047268990978,}}'}, starting 

[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-16 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837562#comment-17837562
 ] 

Aleksandr Pilipenko commented on FLINK-35115:
-

[~vadim.vararu] are you able to reproduce this issue consistently?

Can you enable debug logs and share these logs here? Kinesis connectors logs 
information about sequence numbers it is storing during checkpointing, which 
may shed some light on the issue.

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-16 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837533#comment-17837533
 ] 

Vadim Vararu commented on FLINK-35115:
--

FYI it's reproducible in Kinesis -> Flink -> S3 sink also.

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-15 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837327#comment-17837327
 ] 

Aleksandr Pilipenko commented on FLINK-35115:
-

[~dannycranmer] Agree, trying to reproduce it now

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-15 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837319#comment-17837319
 ] 

Hong Liang Teoh commented on FLINK-35115:
-

[~dannycranmer]  That is true, unless it is a problem with 2PC on the sink. The 
main difference for stop-with-savepoint is on the side effects executed on the 
sink.

Let's try and replicate this first!

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-15 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837318#comment-17837318
 ] 

Danny Cranmer commented on FLINK-35115:
---

Stop with savepoint checkpointing logic is no different to regular checkpoints. 
This would be a more fundamental issue than limited to stop-with-savepoint. 

 

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-15 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837315#comment-17837315
 ] 

Hong Liang Teoh commented on FLINK-35115:
-

[~a.pilipenko]  Assigned to you, as you mentioned you are looking into it

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)