Vadim Vararu created FLINK-35115: ------------------------------------ Summary: Kinesis connector writes wrong sequence number at stop with savepoint Key: FLINK-35115 URL: https://issues.apache.org/jira/browse/FLINK-35115 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: 1.16.3 Environment: The issue happens in a *Kinesis -> Flink -> Kafka* exactly-once setup with: * Flink versions checked 1.16.3 and 1.18.1 * Kinesis connector checked 1.16.3 and 4.2.0-1.18 * checkpointing configured at 1 minute with EXACTLY_ONCE mode: {code:java} StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment (); execEnv.enableCheckpointing (60000,EXACTLY_ONCE); execEnv.getCheckpointConfig ().setCheckpointTimeout (90000); execEnv.getCheckpointConfig ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
* Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: {code:java} Properties sinkConfig = new Properties (); sinkConfig.put ("transaction.timeout.ms", 480000); KafkaSink<String> sink = KafkaSink.<String>builder () .setBootstrapServers ("localhost:9092") .setTransactionalIdPrefix ("test-prefix") .setDeliverGuarantee (EXACTLY_ONCE) .setKafkaProducerConfig (sinkConfig) .setRecordSerializer ( (KafkaRecordSerializationSchema<String>) (element, context, timestamp) -> new ProducerRecord<> ( "test-output-topic", null, element.getBytes ())) .build (); {code} * Kinesis consumer defined as: {code:java} FlinkKinesisConsumer<ByteBuffer> flinkKinesisConsumer = new FlinkKinesisConsumer<> ("test-stream", new AbstractDeserializationSchema<> () { @Override public ByteBuffer deserialize (byte[] bytes) { // Return return ByteBuffer.wrap (bytes); } }, props); {code} Reporter: Vadim Vararu Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a stop-with-savepoint, Flink duplicates in Kafka all the records between the last checkpoint and the savepoint at resume: * Event1 is written to Kinesis * Event1 is processed by Flink * Event1 is committed to Kafka at the checkpoint * ............................................................................ * Event2 is written to Kinesis * Event2 is processed by Flink * Stop with savepoint is triggered manually * Event2 is committed to Kafka * ............................................................................ * Job is resumed from the savepoint * *{color:#FF0000}Event2 is written again to Kafka at the first checkpoint{color}* {color:#172b4d}I believe that it's a Kinesis connector issue for 2 reasons:{color} * I've checked the actual Kinesis sequence number in the _metadata file generated at stop-with-savepoint and it's the one from the checkpoint before the savepoint instead of being the one of the last record committed to Kafka. * I've tested exactly the save job with Kafka as source instead of Kinesis as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)