Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99790794 --- Diff: docs/dev/connectors/kafka.md --- @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. ### Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flinkâs Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream[String] = ... + +val myProducer = new FlinkKafkaProducer08[String]( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream[String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer --- End diff -- Maybe `The above examples demonstrate...`?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---