This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dd768722649 KAFKA-13709: Add docs for exactly-once support in Connect 
(#12941)
dd768722649 is described below

commit dd7687226495b86b8e35dea1ca1749e71bccb2ea
Author: Chris Egerton <chr...@aiven.io>
AuthorDate: Thu Dec 15 11:20:49 2022 -0500

    KAFKA-13709: Add docs for exactly-once support in Connect (#12941)
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>
---
 .../runtime/distributed/DistributedConfig.java     |   9 +-
 docs/connect.html                                  | 211 +++++++++++++++++++++
 2 files changed, 217 insertions(+), 3 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 9544b794114..8c75f3a4076 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -232,9 +232,12 @@ public class DistributedConfig extends WorkerConfig {
 
     public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = 
"exactly.once.source.support";
     public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to 
enable exactly-once support for source connectors in the cluster "
-            + "by using transactions to write source records and their source 
offsets, and by proactively fencing out old task generations before bringing up 
new ones. ";
-            // TODO: https://issues.apache.org/jira/browse/KAFKA-13709
-            //       + "See the exactly-once source support documentation at 
[add docs link here] for more information on this feature.";
+            + "by using transactions to write source records and their source 
offsets, and by proactively fencing out old task generations before bringing up 
new ones.\n"
+            + "To enable exactly-once source support on a new cluster, set 
this property to '" + ExactlyOnceSourceSupport.ENABLED + "'. "
+            + "To enable support on an existing cluster, first set to '" + 
ExactlyOnceSourceSupport.PREPARING + "' on every worker in the cluster, "
+            + "then set to '" + ExactlyOnceSourceSupport.ENABLED + "'. A 
rolling upgrade may be used for both changes. "
+            + "For more information on this feature, see the "
+            + "<a 
href=\"https://kafka.apache.org/documentation.html#connect_exactlyoncesource\";>exactly-once
 source support documentation</a>.";
     public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT = 
ExactlyOnceSourceSupport.DISABLED.toString();
 
     private static Object defaultKeyGenerationAlgorithm(Crypto crypto) {
diff --git a/docs/connect.html b/docs/connect.html
index ba960ed5c22..fca18ddf7d1 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -369,6 +369,114 @@ errors.deadletterqueue.topic.name=my-connector-errors
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once 
support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees 
for sink connectors (as of version 0.11.0) and source connectors (as of version 
3.3.0). Please note that <b>support for exactly-once delivery is highly 
dependent on the type of connector you run.</b> Even if you set all the correct 
worker properties in the configuration for each node in a cluster, if a 
connector is not designed to, or cannot take advantage of the capabilities of 
the Kafka Connect framework, exa [...]
+
+    <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink 
connectors</a></h5>
+
+    <p>If a sink connector supports exactly-once delivery, to enable 
exactly-once delivery at the Connect worker level, you must ensure its consumer 
group is configured to ignore records in aborted transactions. You can do this 
by setting the worker property <code>consumer.isolation.level</code> to 
<code>read_committed</code> or, if running a version of Kafka Connect that 
supports it, using a <a 
href="#connectconfigs_connector.client.config.override.policy">connector client 
config overri [...]
+
+    <h5><a id="connect_exactlyoncesource" 
href="connect_exactlyoncesource">Source connectors</a></h5>
+
+    <p>If a source connector supports exactly-once delivery, you must 
configure your Connect cluster to enable framework-level support for 
exactly-once delivery for source connectors. Additional ACLs may be necessary 
if running against a secured Kafka cluster. Note that exactly-once support for 
source connectors is currently only available in distributed mode; standalone 
Connect workers cannot provide exactly-once guarantees.</p>
+
+    <h6>Worker configuration</h6>
+
+    <p>For new Connect clusters, set the 
<code>exactly.once.source.support</code> property to <code>enabled</code> in 
the worker config for each node in the cluster. For existing clusters, two 
rolling upgrades are necessary. During the first upgrade, the 
<code>exactly.once.source.support</code> property should be set to 
<code>preparing</code>, and during the second, it should be set to 
<code>enabled</code>.</p>
+
+    <h6>ACL requirements</h6>
+
+    <p>With exactly-once source support enabled, the principal for each 
Connect worker will require the following ACLs:</p>
+
+    <table class="data-table">
+        <thead>
+            <tr>
+                <th>Operation</th>
+                <th>Resource Type</th>
+                <th>Resource Name</th>
+                <th>Note</th>
+            </tr>
+        </thead>
+        <tbody>
+            <tr>
+                <td>Write</td>
+                <td>TransactionalId</td>
+                <td><code>connect-cluster-${groupId}</code>, where 
<code>${groupId}</code> is the <code>group.id</code> of the cluster</td>
+                <td></td>
+            </tr>
+            <tr>
+                <td>Describe</td>
+                <td>TransactionalId</td>
+                <td><code>connect-cluster-${groupId}</code>, where 
<code>${groupId}</code> is the <code>group.id</code> of the cluster</td>
+                <td></td>
+            </tr>
+            <tr>
+                <td>IdempotentWrite</td>
+                <td>Cluster</td>
+                <td>ID of the Kafka cluster that hosts the worker's config 
topic</td>
+                <td>The IdempotentWrite ACL has been deprecated as of 2.8 and 
will only be necessary for Connect clusters running on pre-2.8 Kafka 
clusters</td>
+            </tr>
+        </tbody>
+    </table>
+
+    <p>And the principal for each individual connector will require the 
following ACLs:</p>
+
+    <table class="data-table">
+        <thead>
+            <tr>
+                <th>Operation</th>
+                <th>Resource Type</th>
+                <th>Resource Name</th>
+                <th>Note</th>
+            </tr>
+        </thead>
+        <tbody>
+            <tr>
+                <td>Write</td>
+                <td>TransactionalId</td>
+                <td><code>${groupId}-${connector}-${taskId}</code>, for each 
task that the connector will create, where <code>${groupId}</code> is the 
<code>group.id</code> of the Connect cluster, <code>${connector}</code> is the 
name of the connector, and <code>${taskId}</code> is the ID of the task 
(starting from zero)</td>
+                <td>A wildcard prefix of <code>${groupId}-${connector}*</code> 
can be used for convenience if there is no risk of conflict with other 
transactional IDs or if conflicts are acceptable to the user.</td>
+            </tr>
+            <tr>
+                <td>Describe</td>
+                <td>TransactionalId</td>
+                <td><code>${groupId}-${connector}-${taskId}</code>, for each 
task that the connector will create, where <code>${groupId}</code> is the 
<code>group.id</code> of the Connect cluster, <code>${connector}</code> is the 
name of the connector, and <code>${taskId}</code> is the ID of the task 
(starting from zero)</td>
+                <td>A wildcard prefix of <code>${groupId}-${connector}*</code> 
can be used for convenience if there is no risk of conflict with other 
transactional IDs or if conflicts are acceptable to the user.</td>
+            </tr>
+            <tr>
+                <td>Write</td>
+                <td>Topic</td>
+                <td>Offsets topic used by the connector, which is either the 
value of the <code>offsets.storage.topic</code> property in the connector’s 
configuration if provided, or the value of the 
<code>offsets.storage.topic</code> property in the worker’s configuration if 
not.</td>
+                <td></td>
+            </tr>
+            <tr>
+                <td>Read</td>
+                <td>Topic</td>
+                <td>Offsets topic used by the connector, which is either the 
value of the <code>offsets.storage.topic</code> property in the connector’s 
configuration if provided, or the value of the 
<code>offsets.storage.topic</code> property in the worker’s configuration if 
not.</td>
+                <td></td>
+            </tr>
+            <tr>
+                <td>Describe</td>
+                <td>Topic</td>
+                <td>Offsets topic used by the connector, which is either the 
value of the <code>offsets.storage.topic</code> property in the connector’s 
configuration if provided, or the value of the 
<code>offsets.storage.topic</code> property in the worker’s configuration if 
not.</td>
+                <td></td>
+            </tr>
+            <tr>
+                <td>Create</td>
+                <td>Topic</td>
+                <td>Offsets topic used by the connector, which is either the 
value of the <code>offsets.storage.topic</code> property in the connector’s 
configuration if provided, or the value of the 
<code>offsets.storage.topic</code> property in the worker’s configuration if 
not.</td>
+                <td>Only necessary if the offsets topic for the connector does 
not exist yet</td>
+            </tr>
+            <tr>
+                <td>IdempotentWrite</td>
+                <td>Cluster</td>
+                <td>ID of the Kafka cluster that the source connector writes 
to</td>
+                <td>The IdempotentWrite ACL has been deprecated as of 2.8 and 
will only be necessary for Connect clusters running on pre-2.8 Kafka 
clusters</td>
+            </tr>
+        </tbody>
+    </table>
+
     <h3><a id="connect_development" href="#connect_development">8.3 Connector 
Development Guide</a></h3>
 
     <p>This guide describes how developers can write new connectors for Kafka 
Connect to move data between Kafka and other systems. It briefly reviews a few 
key concepts and then describes how to create a simple connector.</p>
@@ -593,6 +701,109 @@ if (offset != null) {
 
     <p>Of course, you might need to read many keys for each of the input 
streams. The <code>OffsetStorageReader</code> interface also allows you to 
issue bulk reads to efficiently load all offsets, then apply them by seeking 
each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" 
href="#connect_exactlyoncesourceconnectors>">Exactly-once source 
connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors";>KIP-618</a>,
 Kafka Connect supports exactly-once source connectors as of version 3.3.0. In 
order for a source connector to take advantage of this support, it must be able 
to provide meaningful source offsets for each record that it emits, and resume 
consumption from the external system at the exact position corresponding to any 
of those offsets with [...]
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new 
Kafka transaction for each batch of records that a source task returns from its 
<code>poll</code> method. However, connectors can also define their own 
transaction boundaries, which can be enabled by users by setting the 
<code>transaction.boundary</code> property to <code>connector</code> in the 
config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a 
<code>TransactionContext</code> from their <code>SourceTaskContext</code>, 
which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+    return records;
+}
+</pre>
+
+    <p>Or to commit a transaction for exactly every tenth record:</p>
+
+    <pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    for (SourceRecord record : records) {
+        if (++this.recordsSent % 10 == 0) {
+            this.context.transactionContext().commitTransaction(record);
+        }
+    }
+    return records;
+}
+</pre>
+
+    <p>Most connectors do not need to define their own transaction boundaries. 
However, it may be useful if files or objects in the source system are broken 
up into multiple source records, but should be delivered atomically. 
Additionally, it may be useful if it is impossible to give each source record a 
unique source offset, if every record with a given offset is delivered within a 
single transaction.</p>
+
+    <p>Note that if the user has not enabled connector-defined transaction 
boundaries in the connector configuration, the <code>TransactionContext</code> 
returned by <code>context.transactionContext()</code> will be 
<code>null</code>.</p>
+
+    <h6>Validation APIs</h6>
+
+    <p>A few additional preflight validation APIs can be implemented by source 
connector developers.</p>
+
+    <p>Some users may require exactly-once delivery guarantees from a 
connector. In this case, they may set the <code>exactly.once.support</code> 
property to <code>required</code> in the configuration for the connector. When 
this happens, the Kafka Connect framework will ask the connector whether it can 
provide exactly-once delivery guarantees with the specified configuration. This 
is done by invoking the <code>exactlyOnceSupport</code> method on the 
connector.</p>
+
+    <p>If a connector doesn't support exactly-once delivery, it should still 
implement this method to let users know for certain that it cannot provide 
exactly-once delivery guarantees:</p>
+
+<pre class="brush: java;">
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map&lt;String, String&gt; props) {
+    // This connector cannot provide exactly-once delivery guarantees under 
any conditions
+    return ExactlyOnceSupport.UNSUPPORTED;
+}
+</pre>
+
+    <p>Otherwise, a connector should examine the configuration, and return 
<code>ExactlyOnceSupport.SUPPORTED</code> if it can provide exactly-once 
delivery guarantees:</p>
+
+<pre class="brush: java;">
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map&lt;String, String&gt; props) {
+    // This connector can always provide exactly-once delivery guarantees
+    return ExactlyOnceSupport.SUPPORTED;
+}
+</pre>
+
+    <p>Additionally, if the user has configured the connector to define its 
own transaction boundaries, the Kafka Connect framework will ask the connector 
whether it can define its own transaction boundaries with the specified 
configuration, using the <code>canDefineTransactionBoundaries</code> method:</p>
+
+<pre class="brush: java;">
+@Override
+public ConnectorTransactionBoundaries 
canDefineTransactionBoundaries(Map&lt;String, String&gt; props) {
+    // This connector can always define its own transaction boundaries
+    return ConnectorTransactionBoundaries.SUPPORTED;
+}
+</pre>
+
+    <p>This method should only be implemented for connectors that can define 
their own transaction boundaries in some cases. If a connector is never able to 
define its own transaction boundaries, it does not need to implement this 
method.</p>
+
     <h4><a id="connect_dynamicio" href="#connect_dynamicio">Dynamic 
Input/Output Streams</a></h4>
 
     <p>Kafka Connect is intended to define bulk data copying jobs, such as 
copying an entire database rather than creating many jobs to copy each table 
individually. One consequence of this design is that the set of input or output 
streams for a connector can vary over time.</p>

Reply via email to