Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6387#discussion_r204346197
--- Diff:
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
---
@@ -84,7 +93,14 @@ public Kafka08JsonTableSink(String topic, Properties
properties, KafkaPartitione
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic,
Properties properties, SerializationSchema<Row> serializationSchema,
FlinkKafkaPartitioner<Row> partitioner) {
- return new FlinkKafkaProducer08<>(topic, serializationSchema,
properties, partitioner);
+ final FlinkKafkaProducerBase<Row> kafkaProducer = new
FlinkKafkaProducer08<>(
+ topic,
+ serializationSchema,
+ properties,
+ partitioner);
+ // always enable flush on checkpoint to achieve at-least-once
if query runs with checkpointing enabled.
--- End diff --
ditto
---