[ 
https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995396#comment-16995396
 ] 

chaiyongqiang commented on FLINK-14719:
---------------------------------------

[~jark], thanks.  I'll handle  this myself  on flink1.8  branch and will create 
a pr for developing branch.  Wish i could catch flink1.10 which will be 
released next month. 
Please close this issue. 

> Making  Semantic configurable in Flinkkafkaproducer to support exactly-once 
> semantic in Table API
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14719
>                 URL: https://issues.apache.org/jira/browse/FLINK-14719
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.8.0
>            Reporter: chaiyongqiang
>            Priority: Major
>
> Flink supports kafka transaction with FlinkKafkaProducer and 
> FlinkKafkaProducer011 . When we use Datastream API , it's able to realize 
> exactly once semantic .  But when we use Table API, things are different. 
> The createKafkaProducer method in KafkaTableSink is used to create 
> FlinkKafkaProducer to sending messages to Kafka server.  It's like :
> {code:java}
> protected SinkFunction<Row> createKafkaProducer(
>               String topic,
>               Properties properties,
>               SerializationSchema<Row> serializationSchema,
>               Optional<FlinkKafkaPartitioner<Row>> partitioner) {
>               return new FlinkKafkaProducer<>(
>                       topic,
>                       new 
> KeyedSerializationSchemaWrapper<>(serializationSchema),
>                       properties,
>                       partitioner);
>       }
> {code}
> when we get into the constructor of FlinkKafkaProducer we can see this will 
> lead to an at_least_once semantic producer :
> {code:java}
>       public FlinkKafkaProducer(
>               String defaultTopicId,
>               KeyedSerializationSchema<IN> serializationSchema,
>               Properties producerConfig,
>               Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
>               this(
>                       defaultTopicId,
>                       serializationSchema,
>                       producerConfig,
>                       customPartitioner,
>                       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
>                       DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
>       }
> {code}
> This makes user could not achieve exactly-once semantic when using Table API. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to