[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-19 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838894#comment-17838894
 ] 

Vadim Vararu commented on FLINK-35115:
--

Great, thanks for quick fix (y)

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854
 ] 

Vadim Vararu edited comment on FLINK-35115 at 4/19/24 4:54 AM:
---

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?


was (Author: JIRAUSER305101):
[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-35115 ]


Vadim Vararu deleted comment on FLINK-35115:
--

was (Author: JIRAUSER305101):
[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838853#comment-17838853
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838121#comment-17838121
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] Yes, I can reproduce this consistently.

 

I've enabled this logger:

 
{code:java}
logger.kinesis.name = org.apache.flink.streaming.connectors.kinesis
logger.kinesis.level = DEBUG {code}
and got these last logs on TM before triggering the stop-with-savepoint (the 
log at 2024-04-17 14:05:11,753 is the last checkpoint):

 

 
{code:java}
2024-04-17 14:05:06,330 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:11,753 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
Snapshotting state ...

2024-04-17 14:05:11,753 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
Snapshotted state, last processed sequence numbers: 
{StreamShardMetadata{streamName='kinesis-dev-1-20210513-v3-contract-impression',
 shardId='shardId-', parentShardId='null', 
adjacentParentShardId='null', startingHashKey='0', 
endingHashKey='340282366920938463463374607431768211455', 
startingSequenceNumber='49618213417511572504838906841289148356109207047268990978',
 
endingSequenceNumber='null'}=49646826022549514041791139259235973731492142339223191554},
 checkpoint id: 1, timestamp: 1713351911711

2024-04-17 14:05:16,652 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:26,930 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:27,032 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
stream: kinesis-dev-1-20210513-v3-contract-impression, shard: 
shardId-, millis behind latest: 0, batch size: 120

24-04-17 14:05:37,229 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:43,079 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
stream: kinesis-dev-1-20210513-v3-contract-impression, shard: 
shardId-, millis behind latest: 0, batch size: 1

2024-04-17 14:05:47,752 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:50,677 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
stream: kinesis-dev-1-20210513-v3-contract-impression, shard: 
shardId-, millis behind latest: 0, batch size: 1{code}
now I trigger the stop-with-savepoint:
{code:java}
2024-04-17 14:05:52,168 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 
...

2024-04-17 14:05:52,169 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Cancelled discovery

2024-04-17 14:05:52,169 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Shutting down the shard consumer threads of subtask 0 ...

2024-04-17 14:05:52,645 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
snapshotState() called on closed source; returning null.

2024-04-17 14:05:52,669 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 
...

2024-04-17 14:05:52,670 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Shutting down the shard consumer threads of subtask 0 ...  {code}
and here I start from the savepoint:
{code:java}
2024-04-17 14:12:56,691 INFO  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Setting 
restore state in the FlinkKinesisConsumer. Using the following offsets: 
{org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata$EquivalenceWrapper@f5191c51=49646826022549514041791139259235973731492142339223191554}

2024-04-17 14:12:58,370 INFO  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 
0 is seeding the fetcher with restored shard 
StreamShardHandle{streamName='kinesis-dev-1-20210513-v3-contract-impression', 
shard='{ShardId: shardId-,HashKeyRange: {StartingHashKey: 
0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49618213417511572504838906841289148356109207047268990978,}}'}, starting 

[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-16 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837533#comment-17837533
 ] 

Vadim Vararu commented on FLINK-35115:
--

FYI it's reproducible in Kinesis -> Flink -> S3 sink also.

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-15 Thread Vadim Vararu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vadim Vararu updated FLINK-35115:
-
Summary: Kinesis connector writes wrong Kinesis sequence number at stop 
with savepoint  (was: Kinesis connector writes wrong sequence number at stop 
with savepoint)

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35115) Kinesis connector writes wrong sequence number at stop with savepoint

2024-04-15 Thread Vadim Vararu (Jira)
Vadim Vararu created FLINK-35115:


 Summary: Kinesis connector writes wrong sequence number at stop 
with savepoint
 Key: FLINK-35115
 URL: https://issues.apache.org/jira/browse/FLINK-35115
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Affects Versions: 1.16.3
 Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
exactly-once setup with:
 * Flink versions checked 1.16.3 and 1.18.1
 * Kinesis connector checked 1.16.3 and 4.2.0-1.18
 * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
{code:java}
StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment (); 
execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
().setCheckpointStorage (CHECKPOINTS_PATH); {code}

 * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
{code:java}
Properties sinkConfig = new Properties ();
sinkConfig.put ("transaction.timeout.ms", 48);

KafkaSink sink = KafkaSink.builder ()
.setBootstrapServers ("localhost:9092")
.setTransactionalIdPrefix ("test-prefix")
.setDeliverGuarantee (EXACTLY_ONCE)
.setKafkaProducerConfig (sinkConfig)
.setRecordSerializer (
(KafkaRecordSerializationSchema) (element, context, timestamp) 
-> new ProducerRecord<> (
"test-output-topic", null, element.getBytes ()))
.build (); {code}

 * Kinesis consumer defined as: 
{code:java}
FlinkKinesisConsumer flinkKinesisConsumer = new
FlinkKinesisConsumer<> ("test-stream",
new AbstractDeserializationSchema<> () {
@Override
public ByteBuffer deserialize (byte[] bytes) {
// Return
return ByteBuffer.wrap (bytes);
}
}, props); {code}
 
Reporter: Vadim Vararu


Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
stop-with-savepoint, Flink duplicates in Kafka all the records between the last 
checkpoint and the savepoint at resume:
 * Event1 is written to Kinesis
 * Event1 is processed by Flink 
 * Event1 is committed to Kafka at the checkpoint
 * 
 * Event2 is written to Kinesis
 * Event2 is processed by Flink
 * Stop with savepoint is triggered manually
 * Event2 is committed to Kafka
 * 
 * Job is resumed from the savepoint
 * *{color:#FF}Event2 is written again to Kafka at the first 
checkpoint{color}*

 

{color:#172b4d}I believe that it's a Kinesis connector issue for 2 
reasons:{color}
 * I've checked the actual Kinesis sequence number in the _metadata file 
generated at stop-with-savepoint and it's the one from the checkpoint before 
the savepoint  instead of being the one of the last record committed to Kafka.
 * I've tested exactly the save job with Kafka as source instead of Kinesis as 
source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong sequence number at stop with savepoint

2024-04-15 Thread Vadim Vararu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vadim Vararu updated FLINK-35115:
-
Affects Version/s: 1.18.1

> Kinesis connector writes wrong sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35115) Kinesis connector writes wrong sequence number at stop with savepoint

2024-04-15 Thread Vadim Vararu (Jira)
Vadim Vararu created FLINK-35115:


 Summary: Kinesis connector writes wrong sequence number at stop 
with savepoint
 Key: FLINK-35115
 URL: https://issues.apache.org/jira/browse/FLINK-35115
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Affects Versions: 1.16.3
 Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
exactly-once setup with:
 * Flink versions checked 1.16.3 and 1.18.1
 * Kinesis connector checked 1.16.3 and 4.2.0-1.18
 * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
{code:java}
StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment (); 
execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
().setCheckpointStorage (CHECKPOINTS_PATH); {code}

 * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
{code:java}
Properties sinkConfig = new Properties ();
sinkConfig.put ("transaction.timeout.ms", 48);

KafkaSink sink = KafkaSink.builder ()
.setBootstrapServers ("localhost:9092")
.setTransactionalIdPrefix ("test-prefix")
.setDeliverGuarantee (EXACTLY_ONCE)
.setKafkaProducerConfig (sinkConfig)
.setRecordSerializer (
(KafkaRecordSerializationSchema) (element, context, timestamp) 
-> new ProducerRecord<> (
"test-output-topic", null, element.getBytes ()))
.build (); {code}

 * Kinesis consumer defined as: 
{code:java}
FlinkKinesisConsumer flinkKinesisConsumer = new
FlinkKinesisConsumer<> ("test-stream",
new AbstractDeserializationSchema<> () {
@Override
public ByteBuffer deserialize (byte[] bytes) {
// Return
return ByteBuffer.wrap (bytes);
}
}, props); {code}
 
Reporter: Vadim Vararu


Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
stop-with-savepoint, Flink duplicates in Kafka all the records between the last 
checkpoint and the savepoint at resume:
 * Event1 is written to Kinesis
 * Event1 is processed by Flink 
 * Event1 is committed to Kafka at the checkpoint
 * 
 * Event2 is written to Kinesis
 * Event2 is processed by Flink
 * Stop with savepoint is triggered manually
 * Event2 is committed to Kafka
 * 
 * Job is resumed from the savepoint
 * *{color:#FF}Event2 is written again to Kafka at the first 
checkpoint{color}*

 

{color:#172b4d}I believe that it's a Kinesis connector issue for 2 
reasons:{color}
 * I've checked the actual Kinesis sequence number in the _metadata file 
generated at stop-with-savepoint and it's the one from the checkpoint before 
the savepoint  instead of being the one of the last record committed to Kafka.
 * I've tested exactly the save job with Kafka as source instead of Kinesis as 
source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)