[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vadim Vararu updated FLINK-35115:
---------------------------------
    Affects Version/s: 1.18.1

> Kinesis connector writes wrong sequence number at stop with savepoint
> ---------------------------------------------------------------------
>
>                 Key: FLINK-35115
>                 URL: https://issues.apache.org/jira/browse/FLINK-35115
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.16.3, 1.18.1
>         Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (60000,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (90000); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 480000);
> KafkaSink<String> sink = KafkaSink.<String>builder ()
>     .setBootstrapServers ("localhost:9092")
>     .setTransactionalIdPrefix ("test-prefix")
>     .setDeliverGuarantee (EXACTLY_ONCE)
>     .setKafkaProducerConfig (sinkConfig)
>     .setRecordSerializer (
>         (KafkaRecordSerializationSchema<String>) (element, context, 
> timestamp) -> new ProducerRecord<> (
>             "test-output-topic", null, element.getBytes ()))
>     .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer<ByteBuffer> flinkKinesisConsumer = new
>     FlinkKinesisConsumer<> ("test-stream",
>     new AbstractDeserializationSchema<> () {
>         @Override
>         public ByteBuffer deserialize (byte[] bytes) {
>             // Return
>             return ByteBuffer.wrap (bytes);
>         }
>     }, props); {code}
>  
>            Reporter: Vadim Vararu
>            Priority: Major
>              Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> ............................................................................
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> ............................................................................
>  * Job is resumed from the savepoint
>  * *{color:#FF0000}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to