Repository: flink
Updated Branches:
  refs/heads/release-1.2 87697f46a -> bcace0654


[FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer

This closes #3282.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcace065
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcace065
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcace065

Branch: refs/heads/release-1.2
Commit: bcace0654de1c5e9e854a554a782d24822391d01
Parents: 87697f4
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Tue Feb 7 14:32:28 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Tue Feb 7 23:27:16 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md | 138 ++++++++++++++++++++++++++++----------
 1 file changed, 103 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bcace065/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 6a58b7a..a727f85 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -23,6 +23,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+* This will be replaced by the TOC
+{:toc}
+
 This connector provides access to event streams served by [Apache 
Kafka](https://kafka.apache.org/).
 
 Flink provides special Kafka Connectors for reading and writing data from/to 
Kafka topics.
@@ -84,14 +87,14 @@ Then, import the connector in your maven project:
 
 Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
 
-### Installing Apache Kafka
+## Installing Apache Kafka
 
 * Follow the instructions from [Kafka's 
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download 
the code and launch a server (launching a Zookeeper and a Kafka server is 
required every time before starting the application).
 * If the Kafka and Zookeeper servers are running on a remote machine, then the 
`advertised.host.name` setting in the `config/server.properties` file must be 
set to the machine's IP address.
 
-### Kafka Consumer
+## Kafka Consumer
 
-Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 
0.9.0.x versions). It provides access to one or more Kafka topics.
+Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 
0.9.0.x versions, etc.). It provides access to one or more Kafka topics.
 
 The constructor accepts the following arguments:
 
@@ -137,7 +140,7 @@ for querying the list of topics and partitions.
 For this to work, the consumer needs to be able to access the consumers from 
the machine submitting the job to the Flink cluster.
 If you experience any issues with the Kafka consumer on the client side, the 
client log might contain information about failed requests, etc.
 
-#### The `DeserializationSchema`
+### The `DeserializationSchema`
 
 The Flink Kafka Consumer needs to know how to turn the binary data in Kafka 
into Java/Scala objects. The
 `DeserializationSchema` allows users to specify such a schema. The `T 
deserialize(byte[] message)`
@@ -161,7 +164,7 @@ For convenience, Flink provides the following schemas:
     The KeyValue objectNode contains a "key" and "value" field which contain 
all fields, as well as
     an optional "metadata" field that exposes the offset/partition/topic for 
this message.
 
-#### Kafka Consumers and Fault Tolerance
+### Kafka Consumers and Fault Tolerance
 
 With Flink's checkpointing enabled, the Flink Kafka Consumer will consume 
records from a topic and periodically checkpoint all
 its Kafka offsets, together with the state of other operations, in a 
consistent manner. In case of a job failure, Flink will restore
@@ -193,7 +196,7 @@ Flink on YARN supports automatic restart of lost YARN 
containers.
 
 If checkpointing is not enabled, the Kafka consumer will periodically commit 
the offsets to Zookeeper.
 
-#### Kafka Consumers and Timestamp Extraction/Watermark Emission
+### Kafka Consumers and Timestamp Extraction/Watermark Emission
 
 In many scenarios, the timestamp of a record is embedded (explicitly or 
implicitly) in the record itself.
 In addition, the user may want to emit watermarks either periodically, or in 
an irregular fashion, e.g. based on
@@ -248,59 +251,124 @@ the `Watermark getCurrentWatermark()` (for periodic) or 
the
 if a new watermark should be emitted and with which timestamp.
 
 
-### Kafka Producer
+## Kafka Producer
 
-The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can 
specify a custom partitioner that assigns
-records to partitions.
+Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 
0.9.0.x versions, etc.).
+It allows writing a stream of records to one or more Kafka topics.
 
 Example:
 
-
 <div class="codetabs" markdown="1">
 <div data-lang="java, Kafka 0.8+" markdown="1">
 {% highlight java %}
-stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", 
new SimpleStringSchema()));
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
+        "localhost:9092",            // broker list
+        "my-topic",                  // target topic
+        new SimpleStringSchema());   // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false);   // "false" by default
+myProducer.setFlushOnCheckpoint(true);  // "false" by default
+
+stream.addSink(myProducer);
 {% endhighlight %}
 </div>
 <div data-lang="java, Kafka 0.10+" markdown="1">
 {% highlight java %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new 
SimpleStringSchema(), properties);
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer010Configuration myProducerConfig = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+        stream,                     // input stream
+        "my-topic",                 // target topic
+        new SimpleStringSchema(),   // serialization schema
+        properties);                // custom configuration for KafkaProducer 
(including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false);   // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true);  // "false" by default
 {% endhighlight %}
 </div>
 <div data-lang="scala, Kafka 0.8+" markdown="1">
 {% highlight scala %}
-stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", 
new SimpleStringSchema()))
+val stream: DataStream[String] = ...
+
+val myProducer = new FlinkKafkaProducer08[String](
+        "localhost:9092",         // broker list
+        "my-topic",               // target topic
+        new SimpleStringSchema)   // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false)   // "false" by default
+myProducer.setFlushOnCheckpoint(true)  // "false" by default
+
+stream.addSink(myProducer)
 {% endhighlight %}
 </div>
 <div data-lang="scala, Kafka 0.10+" markdown="1">
 {% highlight scala %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new 
SimpleStringSchema(), properties);
+val stream: DataStream[String] = ...
+
+val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+        stream,                   // input stream
+        "my-topic",               // target topic
+        new SimpleStringSchema,   // serialization schema
+        properties)               // custom configuration for KafkaProducer 
(including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false)   // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true)  // "false" by default
 {% endhighlight %}
 </div>
 </div>
 
-You can also define a custom Kafka producer configuration for the KafkaSink 
with the constructor. Please refer to
-the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) 
for details on how to configure
-Kafka Producers.
-
-Similar to the consumer, the producer also allows using an advanced 
serialization schema which allows
-serializing the key and value separately. It also allows to override the 
target topic id, so that
-one producer instance can send data to multiple topics.
-
-The interface of the serialization schema is called `KeyedSerializationSchema`.
-
-
-**Note**: By default, the number of retries is set to "0". This means that the 
producer fails immediately on errors,
-including leader changes. The value is set to "0" by default to avoid 
duplicate messages in the target topic.
-For most production environments with frequent broker changes, we recommend 
setting the number of retries to a
-higher value.
-
-There is currently no transactional producer for Kafka, so Flink can not 
guarantee exactly-once delivery
+The above examples demonstrate the basic usage of creating a Flink Kafka 
Producer
+to write streams to a single Kafka target topic. For more advanced usages, 
there
+are other constructor variants that allow providing the following:
+
+ * *Providing custom properties*:
+ The producer allows providing a custom properties configuration for the 
internal `KafkaProducer`.
+ Please refer to the [Apache Kafka 
documentation](https://kafka.apache.org/documentation.html) for
+ details on how to configure Kafka Producers.
+ * *Custom partitioner*: To assign records to specific
+ partitions, you can provide an implementation of a `KafkaPartitioner` to the
+ constructor. This partitioner will be called for each record in the stream
+ to determine which exact partition the record will be sent to.
+ * *Advanced serialization schema*: Similar to the consumer,
+ the producer also allows using an advanced serialization schema called 
`KeyedSerializationSchema`,
+ which allows serializing the key and value separately. It also allows to 
override the target topic,
+ so that one producer instance can send data to multiple topics.
+ 
+### Kafka Producers and Fault Tolerance
+
+With Flink's checkpointing enabled, the Flink Kafka Producer can provide
+at-least-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you should also configure the setter
+methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` 
appropriately,
+as shown in the above examples in the previous section:
+
+ * `setLogFailuresOnly(boolean)`: enabling this will let the producer log 
failures only
+ instead of catching and rethrowing them. This essentially accounts the record
+ to have succeeded, even if it was never written to the target Kafka topic. 
This
+ must be disabled for at-least-once.
+ * `setFlushOnCheckpoint(boolean)`: with this enabled, Flink's checkpoints 
will wait for any
+ on-the-fly records at the time of the checkpoint to be acknowledged by Kafka 
before
+ succeeding the checkpoint. This ensures that all records before the 
checkpoint have
+ been written to Kafka. This must be enabled for at-least-once.
+
+**Note**: By default, the number of retries is set to "0". This means that 
when `setLogFailuresOnly` is set to `false`,
+the producer fails immediately on errors, including leader changes. The value 
is set to "0" by default to avoid
+duplicate messages in the target topic that are caused by retries. For most 
production environments with frequent broker changes,
+we recommend setting the number of retries to a higher value.
+
+**Note**: There is currently no transactional producer for Kafka, so Flink can 
not guarantee exactly-once delivery
 into a Kafka topic.
 
-### Using Kafka timestamps and Flink event time in Kafka 0.10
+## Using Kafka timestamps and Flink event time in Kafka 0.10
 
-Since Apache Kafka 0.10., Kafka's messages can carry 
[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message),
 indicating
+Since Apache Kafka 0.10+, Kafka's messages can carry 
[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message),
 indicating
 the time the event has occurred (see ["event time" in Apache 
Flink](../event_time.html)) or the time when the message
 has been written to the Kafka broker.
 
@@ -331,7 +399,7 @@ config.setWriteTimestampToKafka(true);
 
 
 
-### Kafka Connector metrics
+## Kafka Connector metrics
 
 Flink's Kafka connectors provide some metrics through Flink's [metrics 
system]({{ site.baseurl }}/monitoring/metrics.html) to analyze
 the behavior of the connector.
@@ -354,7 +422,7 @@ the committed offset and the most recent offset in each 
partition is called the
 the data slower from the topic than new data is added, the lag will increase 
and the consumer will fall behind.
 For large production deployments we recommend monitoring that metric to avoid 
increasing latency.
 
-### Enabling Kerberos Authentication (for versions 0.9+ and above only)
+## Enabling Kerberos Authentication (for versions 0.9+ and above only)
 
 Flink provides first-class support through the Kafka connector to authenticate 
to a Kafka installation
 configured for Kerberos. Simply configure Flink in `flink-conf.yaml` to enable 
Kerberos authentication for Kafka like so:

Reply via email to