[ 
https://issues.apache.org/jira/browse/FLINK-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855736#comment-15855736
 ] 

ASF GitHub Bot commented on FLINK-5702:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3282#discussion_r99791642
  
    --- Diff: docs/dev/connectors/kafka.md ---
    @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which 
timestamp.
     
     ### Kafka Producer
     
    -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can 
specify a custom partitioner that assigns
    -records to partitions.
    +Flinkā€™s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 
0.9.0.x versions, etc.).
    +It allows writing a stream of records to one or more Kafka topics.
     
     Example:
     
    -
     <div class="codetabs" markdown="1">
     <div data-lang="java, Kafka 0.8+" markdown="1">
     {% highlight java %}
    -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", 
"my-topic", new SimpleStringSchema()));
    +DataStream<String> stream = ...;
    +
    +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
    +        "localhost:9092",            // broker list
    +        "my-topic",                  // target topic
    +        new SimpleStringSchema());   // serialization schema
    +
    +// the following is necessary for at-least-once delivery guarantee
    +myProducer.setLogFailuresOnly(false);   // "false" by default
    +myProducer.setFlushOnCheckpoint(true);  // "false" by default
    +
    +stream.addSink(myProducer);
     {% endhighlight %}
     </div>
     <div data-lang="java, Kafka 0.10+" markdown="1">
     {% highlight java %}
    -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new 
SimpleStringSchema(), properties);
    +DataStream<String> stream = ...;
    +
    +FlinkKafkaProducer010Configuration myProducerConfig = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(
    +        stream,                     // input stream
    +        "my-topic",                 // target topic
    +        new SimpleStringSchema(),   // serialization schema
    +        properties);                // custom configuration for 
KafkaProducer (including broker list)
    +
    +// the following is necessary for at-least-once delivery guarantee
    +myProducerConfig.setLogFailuresOnly(false);   // "false" by default
    +myProducerConfig.setFlushOnCheckpoint(true);  // "false" by default
     {% endhighlight %}
     </div>
     <div data-lang="scala, Kafka 0.8+" markdown="1">
     {% highlight scala %}
    -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", 
"my-topic", new SimpleStringSchema()))
    +val stream: DataStream[String] = ...
    +
    +val myProducer = new FlinkKafkaProducer08[String](
    +        "localhost:9092",         // broker list
    +        "my-topic",               // target topic
    +        new SimpleStringSchema)   // serialization schema
    +
    +// the following is necessary for at-least-once delivery guarantee
    +myProducer.setLogFailuresOnly(false)   // "false" by default
    +myProducer.setFlushOnCheckpoint(true)  // "false" by default
    +
    +stream.addSink(myProducer)
     {% endhighlight %}
     </div>
     <div data-lang="scala, Kafka 0.10+" markdown="1">
     {% highlight scala %}
    -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new 
SimpleStringSchema(), properties);
    +val stream: DataStream[String] = ...
    +
    +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
    +        stream,                   // input stream
    +        "my-topic",               // target topic
    +        new SimpleStringSchema,   // serialization schema
    +        properties)               // custom configuration for 
KafkaProducer (including broker list)
    +
    +// the following is necessary for at-least-once delivery guarantee
    +myProducerConfig.setLogFailuresOnly(false)   // "false" by default
    +myProducerConfig.setFlushOnCheckpoint(true)  // "false" by default
     {% endhighlight %}
     </div>
     </div>
     
    -You can also define a custom Kafka producer configuration for the 
KafkaSink with the constructor. Please refer to
    -the [Apache Kafka 
documentation](https://kafka.apache.org/documentation.html) for details on how 
to configure
    -Kafka Producers.
    -
    -Similar to the consumer, the producer also allows using an advanced 
serialization schema which allows
    -serializing the key and value separately. It also allows to override the 
target topic id, so that
    -one producer instance can send data to multiple topics.
    -
    -The interface of the serialization schema is called 
`KeyedSerializationSchema`.
    -
    +The above demonstrates the basic usage of creating a Flink Kafka Producer
    +to write streams to a single Kafka target topic. For more advanced usages, 
there
    +are other constructor variants that allow providing the following:
    +
    + * *Custom configuration for the internal Kafka client*:
    + The producer allows providing a custom properties configuration for the 
internal `KafkaProducer`.
    + Please refer to the [Apache Kafka 
documentation](https://kafka.apache.org/documentation.html) for
    + details on how to configure Kafka Producers.
    + * *Custom partitioner*: To assign records to specific
    + partitions, you can provide an implementation of a `KafkaProducer` to the
    --- End diff --
    
    Do you actually mean `KafkaProducer` here?


> Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is 
> compromised
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-5702
>                 URL: https://issues.apache.org/jira/browse/FLINK-5702
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation, Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> The documentation for FlinkKafkaProducer does not have any information about 
> the {{setLogFailuresOnly}}. It should emphasize that if users choose to only 
> log failures instead of failing the sink, at-least-once can not be guaranteed 
> .



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to