你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once

Kafka011TableSink


@Override
protected SinkFunction<Row> createKafkaProducer(
      String topic,
      Properties properties,
      SerializationSchema<Row> serializationSchema,
      Optional<FlinkKafkaPartitioner<Row>> partitioner) {
   return new FlinkKafkaProducer011<>(
      topic,
      new KeyedSerializationSchemaWrapper<>(serializationSchema),
      properties,
      partitioner,
      FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
      5);
}
如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase

参考: 
https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
------------------------------------------------------------------
发件人:静谧雨寒 <freedom0...@vip.qq.com>
发送时间:2020年7月1日(星期三) 14:33
收件人:user-zh <user-zh@flink.apache.org>
主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

&nbsp;flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql 
sink表使用两阶事务提交,exactly-once一致性保证 ?
官档说法:
Consistency guarantees: By default, a Kafka sink ingests data with 
at-least-once guarantees into a Kafka topic if the query is executed with 
checkpointing enabled.,&nbsp;&nbsp;
CREATE TABLE 默认是 at-least-once

Reply via email to