[ https://issues.apache.org/jira/browse/FLINK-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855735#comment-15855735 ]
ASF GitHub Bot commented on FLINK-5702: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99792130 --- Diff: docs/dev/connectors/kafka.md --- @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. ### Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flinkās Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream[String] = ... + +val myProducer = new FlinkKafkaProducer08[String]( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream[String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer +to write streams to a single Kafka target topic. For more advanced usages, there +are other constructor variants that allow providing the following: + + * *Custom configuration for the internal Kafka client*: + The producer allows providing a custom properties configuration for the internal `KafkaProducer`. + Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for + details on how to configure Kafka Producers. + * *Custom partitioner*: To assign records to specific + partitions, you can provide an implementation of a `KafkaProducer` to the + constructor. This partitioner will be called for each record in the stream + to determine which exact partition the record will be sent to. + * *Advanced serialization schema*: Similar to the consumer, + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`, + which allows serializing the key and value separately. It also allows to override the target topic, + so that one producer instance can send data to multiple topics. + +The example also shows how to configure the Flink Kafka Producer for at-least-once --- End diff -- Should we move the at least once configuration up the hierarchy and make it a sub heading? > Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is > compromised > ----------------------------------------------------------------------------------------- > > Key: FLINK-5702 > URL: https://issues.apache.org/jira/browse/FLINK-5702 > Project: Flink > Issue Type: Improvement > Components: Documentation, Kafka Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > > The documentation for FlinkKafkaProducer does not have any information about > the {{setLogFailuresOnly}}. It should emphasize that if users choose to only > log failures instead of failing the sink, at-least-once can not be guaranteed > . -- This message was sent by Atlassian JIRA (v6.3.15#6346)