[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837562#comment-17837562
 ] 

Aleksandr Pilipenko commented on FLINK-35115:
---------------------------------------------

[~vadim.vararu] are you able to reproduce this issue consistently?

Can you enable debug logs and share these logs here? Kinesis connectors logs 
information about sequence numbers it is storing during checkpointing, which 
may shed some light on the issue.

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-35115
>                 URL: https://issues.apache.org/jira/browse/FLINK-35115
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.16.3, 1.18.1
>         Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (60000,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (90000); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 480000);
> KafkaSink<String> sink = KafkaSink.<String>builder ()
>     .setBootstrapServers ("localhost:9092")
>     .setTransactionalIdPrefix ("test-prefix")
>     .setDeliverGuarantee (EXACTLY_ONCE)
>     .setKafkaProducerConfig (sinkConfig)
>     .setRecordSerializer (
>         (KafkaRecordSerializationSchema<String>) (element, context, 
> timestamp) -> new ProducerRecord<> (
>             "test-output-topic", null, element.getBytes ()))
>     .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer<ByteBuffer> flinkKinesisConsumer = new
>     FlinkKinesisConsumer<> ("test-stream",
>     new AbstractDeserializationSchema<> () {
>         @Override
>         public ByteBuffer deserialize (byte[] bytes) {
>             // Return
>             return ByteBuffer.wrap (bytes);
>         }
>     }, props); {code}
>  
>            Reporter: Vadim Vararu
>            Assignee: Aleksandr Pilipenko
>            Priority: Major
>              Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> ............................................................................
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> ............................................................................
>  * Job is resumed from the savepoint
>  * *{color:#FF0000}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to