Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3282#discussion_r99790794
  
    --- Diff: docs/dev/connectors/kafka.md ---
    @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which 
timestamp.
     
     ### Kafka Producer
     
    -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can 
specify a custom partitioner that assigns
    -records to partitions.
    +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for 
Kafka 0.9.0.x versions, etc.).
    +It allows writing a stream of records to one or more Kafka topics.
     
     Example:
     
    -
     <div class="codetabs" markdown="1">
     <div data-lang="java, Kafka 0.8+" markdown="1">
     {% highlight java %}
    -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", 
"my-topic", new SimpleStringSchema()));
    +DataStream<String> stream = ...;
    +
    +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
    +        "localhost:9092",            // broker list
    +        "my-topic",                  // target topic
    +        new SimpleStringSchema());   // serialization schema
    +
    +// the following is necessary for at-least-once delivery guarantee
    +myProducer.setLogFailuresOnly(false);   // "false" by default
    +myProducer.setFlushOnCheckpoint(true);  // "false" by default
    +
    +stream.addSink(myProducer);
     {% endhighlight %}
     </div>
     <div data-lang="java, Kafka 0.10+" markdown="1">
     {% highlight java %}
    -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new 
SimpleStringSchema(), properties);
    +DataStream<String> stream = ...;
    +
    +FlinkKafkaProducer010Configuration myProducerConfig = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(
    +        stream,                     // input stream
    +        "my-topic",                 // target topic
    +        new SimpleStringSchema(),   // serialization schema
    +        properties);                // custom configuration for 
KafkaProducer (including broker list)
    +
    +// the following is necessary for at-least-once delivery guarantee
    +myProducerConfig.setLogFailuresOnly(false);   // "false" by default
    +myProducerConfig.setFlushOnCheckpoint(true);  // "false" by default
     {% endhighlight %}
     </div>
     <div data-lang="scala, Kafka 0.8+" markdown="1">
     {% highlight scala %}
    -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", 
"my-topic", new SimpleStringSchema()))
    +val stream: DataStream[String] = ...
    +
    +val myProducer = new FlinkKafkaProducer08[String](
    +        "localhost:9092",         // broker list
    +        "my-topic",               // target topic
    +        new SimpleStringSchema)   // serialization schema
    +
    +// the following is necessary for at-least-once delivery guarantee
    +myProducer.setLogFailuresOnly(false)   // "false" by default
    +myProducer.setFlushOnCheckpoint(true)  // "false" by default
    +
    +stream.addSink(myProducer)
     {% endhighlight %}
     </div>
     <div data-lang="scala, Kafka 0.10+" markdown="1">
     {% highlight scala %}
    -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new 
SimpleStringSchema(), properties);
    +val stream: DataStream[String] = ...
    +
    +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
    +        stream,                   // input stream
    +        "my-topic",               // target topic
    +        new SimpleStringSchema,   // serialization schema
    +        properties)               // custom configuration for 
KafkaProducer (including broker list)
    +
    +// the following is necessary for at-least-once delivery guarantee
    +myProducerConfig.setLogFailuresOnly(false)   // "false" by default
    +myProducerConfig.setFlushOnCheckpoint(true)  // "false" by default
     {% endhighlight %}
     </div>
     </div>
     
    -You can also define a custom Kafka producer configuration for the 
KafkaSink with the constructor. Please refer to
    -the [Apache Kafka 
documentation](https://kafka.apache.org/documentation.html) for details on how 
to configure
    -Kafka Producers.
    -
    -Similar to the consumer, the producer also allows using an advanced 
serialization schema which allows
    -serializing the key and value separately. It also allows to override the 
target topic id, so that
    -one producer instance can send data to multiple topics.
    -
    -The interface of the serialization schema is called 
`KeyedSerializationSchema`.
    -
    +The above demonstrates the basic usage of creating a Flink Kafka Producer
    --- End diff --
    
    Maybe `The above examples demonstrate...`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to