[ https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995396#comment-16995396 ]
chaiyongqiang commented on FLINK-14719: --------------------------------------- [~jark], thanks. I'll handle this myself on flink1.8 branch and will create a pr for developing branch. Wish i could catch flink1.10 which will be released next month. Please close this issue. > Making Semantic configurable in Flinkkafkaproducer to support exactly-once > semantic in Table API > ------------------------------------------------------------------------------------------------- > > Key: FLINK-14719 > URL: https://issues.apache.org/jira/browse/FLINK-14719 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.8.0 > Reporter: chaiyongqiang > Priority: Major > > Flink supports kafka transaction with FlinkKafkaProducer and > FlinkKafkaProducer011 . When we use Datastream API , it's able to realize > exactly once semantic . But when we use Table API, things are different. > The createKafkaProducer method in KafkaTableSink is used to create > FlinkKafkaProducer to sending messages to Kafka server. It's like : > {code:java} > protected SinkFunction<Row> createKafkaProducer( > String topic, > Properties properties, > SerializationSchema<Row> serializationSchema, > Optional<FlinkKafkaPartitioner<Row>> partitioner) { > return new FlinkKafkaProducer<>( > topic, > new > KeyedSerializationSchemaWrapper<>(serializationSchema), > properties, > partitioner); > } > {code} > when we get into the constructor of FlinkKafkaProducer we can see this will > lead to an at_least_once semantic producer : > {code:java} > public FlinkKafkaProducer( > String defaultTopicId, > KeyedSerializationSchema<IN> serializationSchema, > Properties producerConfig, > Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { > this( > defaultTopicId, > serializationSchema, > producerConfig, > customPartitioner, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, > DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); > } > {code} > This makes user could not achieve exactly-once semantic when using Table API. -- This message was sent by Atlassian Jira (v8.3.4#803005)