Alex Sorokoumov created FLINK-31620:
---------------------------------------
Summary: ReducingUpsertWriter does not flush the wrapped writer
Key: FLINK-31620
URL: https://issues.apache.org/jira/browse/FLINK-31620
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Reporter: Alex Sorokoumov
According to {{SinkWriter#flush}}
[javadoc|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java#L43-L47],
the writer must flush its records to guarantee AT_LEAST_ONCE.
{{upsert-kafka}}'s {{ReducingUpsertWriter}} inserts buffered records into the
wrapped writer, but does not flush it:
1. SinkWriter#flush implementation -
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java#L88-L91.
2. The actual flush code -
https://github.com/apache/flink/blob/f3c653ed2e4264315ed83a5b4b2494a7dcc41474/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java#L143-L150.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)