Vadim Vararu created FLINK-35115:
------------------------------------

             Summary: Kinesis connector writes wrong sequence number at stop 
with savepoint
                 Key: FLINK-35115
                 URL: https://issues.apache.org/jira/browse/FLINK-35115
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kinesis
    Affects Versions: 1.16.3
         Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
exactly-once setup with:
 * Flink versions checked 1.16.3 and 1.18.1
 * Kinesis connector checked 1.16.3 and 4.2.0-1.18
 * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
{code:java}
StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment (); 
execEnv.enableCheckpointing (60000,EXACTLY_ONCE); execEnv.getCheckpointConfig 
().setCheckpointTimeout (90000); execEnv.getCheckpointConfig 
().setCheckpointStorage (CHECKPOINTS_PATH); {code}

 * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
{code:java}
Properties sinkConfig = new Properties ();
sinkConfig.put ("transaction.timeout.ms", 480000);

KafkaSink<String> sink = KafkaSink.<String>builder ()
    .setBootstrapServers ("localhost:9092")
    .setTransactionalIdPrefix ("test-prefix")
    .setDeliverGuarantee (EXACTLY_ONCE)
    .setKafkaProducerConfig (sinkConfig)
    .setRecordSerializer (
        (KafkaRecordSerializationSchema<String>) (element, context, timestamp) 
-> new ProducerRecord<> (
            "test-output-topic", null, element.getBytes ()))
    .build (); {code}

 * Kinesis consumer defined as: 
{code:java}
FlinkKinesisConsumer<ByteBuffer> flinkKinesisConsumer = new
    FlinkKinesisConsumer<> ("test-stream",
    new AbstractDeserializationSchema<> () {
        @Override
        public ByteBuffer deserialize (byte[] bytes) {
            // Return
            return ByteBuffer.wrap (bytes);
        }
    }, props); {code}
 
            Reporter: Vadim Vararu


Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
stop-with-savepoint, Flink duplicates in Kafka all the records between the last 
checkpoint and the savepoint at resume:
 * Event1 is written to Kinesis
 * Event1 is processed by Flink 
 * Event1 is committed to Kafka at the checkpoint
 * ............................................................................
 * Event2 is written to Kinesis
 * Event2 is processed by Flink
 * Stop with savepoint is triggered manually
 * Event2 is committed to Kafka
 * ............................................................................
 * Job is resumed from the savepoint
 * *{color:#FF0000}Event2 is written again to Kafka at the first 
checkpoint{color}*

 

{color:#172b4d}I believe that it's a Kinesis connector issue for 2 
reasons:{color}
 * I've checked the actual Kinesis sequence number in the _metadata file 
generated at stop-with-savepoint and it's the one from the checkpoint before 
the savepoint  instead of being the one of the last record committed to Kafka.
 * I've tested exactly the save job with Kafka as source instead of Kinesis as 
source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to