This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new dd768722649 KAFKA-13709: Add docs for exactly-once support in Connect (#12941) dd768722649 is described below commit dd7687226495b86b8e35dea1ca1749e71bccb2ea Author: Chris Egerton <chr...@aiven.io> AuthorDate: Thu Dec 15 11:20:49 2022 -0500 KAFKA-13709: Add docs for exactly-once support in Connect (#12941) Reviewers: Mickael Maison <mickael.mai...@gmail.com> --- .../runtime/distributed/DistributedConfig.java | 9 +- docs/connect.html | 211 +++++++++++++++++++++ 2 files changed, 217 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index 9544b794114..8c75f3a4076 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -232,9 +232,12 @@ public class DistributedConfig extends WorkerConfig { public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support"; public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster " - + "by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones. "; - // TODO: https://issues.apache.org/jira/browse/KAFKA-13709 - // + "See the exactly-once source support documentation at [add docs link here] for more information on this feature."; + + "by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones.\n" + + "To enable exactly-once source support on a new cluster, set this property to '" + ExactlyOnceSourceSupport.ENABLED + "'. " + + "To enable support on an existing cluster, first set to '" + ExactlyOnceSourceSupport.PREPARING + "' on every worker in the cluster, " + + "then set to '" + ExactlyOnceSourceSupport.ENABLED + "'. A rolling upgrade may be used for both changes. " + + "For more information on this feature, see the " + + "<a href=\"https://kafka.apache.org/documentation.html#connect_exactlyoncesource\">exactly-once source support documentation</a>."; public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT = ExactlyOnceSourceSupport.DISABLED.toString(); private static Object defaultKeyGenerationAlgorithm(Crypto crypto) { diff --git a/docs/connect.html b/docs/connect.html index ba960ed5c22..fca18ddf7d1 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -369,6 +369,114 @@ errors.deadletterqueue.topic.name=my-connector-errors # Tolerate all errors. errors.tolerance=all</pre> + <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4> + + <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector you run.</b> Even if you set all the correct worker properties in the configuration for each node in a cluster, if a connector is not designed to, or cannot take advantage of the capabilities of the Kafka Connect framework, exa [...] + + <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5> + + <p>If a sink connector supports exactly-once delivery, to enable exactly-once delivery at the Connect worker level, you must ensure its consumer group is configured to ignore records in aborted transactions. You can do this by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config overri [...] + + <h5><a id="connect_exactlyoncesource" href="connect_exactlyoncesource">Source connectors</a></h5> + + <p>If a source connector supports exactly-once delivery, you must configure your Connect cluster to enable framework-level support for exactly-once delivery for source connectors. Additional ACLs may be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.</p> + + <h6>Worker configuration</h6> + + <p>For new Connect clusters, set the <code>exactly.once.source.support</code> property to <code>enabled</code> in the worker config for each node in the cluster. For existing clusters, two rolling upgrades are necessary. During the first upgrade, the <code>exactly.once.source.support</code> property should be set to <code>preparing</code>, and during the second, it should be set to <code>enabled</code>.</p> + + <h6>ACL requirements</h6> + + <p>With exactly-once source support enabled, the principal for each Connect worker will require the following ACLs:</p> + + <table class="data-table"> + <thead> + <tr> + <th>Operation</th> + <th>Resource Type</th> + <th>Resource Name</th> + <th>Note</th> + </tr> + </thead> + <tbody> + <tr> + <td>Write</td> + <td>TransactionalId</td> + <td><code>connect-cluster-${groupId}</code>, where <code>${groupId}</code> is the <code>group.id</code> of the cluster</td> + <td></td> + </tr> + <tr> + <td>Describe</td> + <td>TransactionalId</td> + <td><code>connect-cluster-${groupId}</code>, where <code>${groupId}</code> is the <code>group.id</code> of the cluster</td> + <td></td> + </tr> + <tr> + <td>IdempotentWrite</td> + <td>Cluster</td> + <td>ID of the Kafka cluster that hosts the worker's config topic</td> + <td>The IdempotentWrite ACL has been deprecated as of 2.8 and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters</td> + </tr> + </tbody> + </table> + + <p>And the principal for each individual connector will require the following ACLs:</p> + + <table class="data-table"> + <thead> + <tr> + <th>Operation</th> + <th>Resource Type</th> + <th>Resource Name</th> + <th>Note</th> + </tr> + </thead> + <tbody> + <tr> + <td>Write</td> + <td>TransactionalId</td> + <td><code>${groupId}-${connector}-${taskId}</code>, for each task that the connector will create, where <code>${groupId}</code> is the <code>group.id</code> of the Connect cluster, <code>${connector}</code> is the name of the connector, and <code>${taskId}</code> is the ID of the task (starting from zero)</td> + <td>A wildcard prefix of <code>${groupId}-${connector}*</code> can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.</td> + </tr> + <tr> + <td>Describe</td> + <td>TransactionalId</td> + <td><code>${groupId}-${connector}-${taskId}</code>, for each task that the connector will create, where <code>${groupId}</code> is the <code>group.id</code> of the Connect cluster, <code>${connector}</code> is the name of the connector, and <code>${taskId}</code> is the ID of the task (starting from zero)</td> + <td>A wildcard prefix of <code>${groupId}-${connector}*</code> can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.</td> + </tr> + <tr> + <td>Write</td> + <td>Topic</td> + <td>Offsets topic used by the connector, which is either the value of the <code>offsets.storage.topic</code> property in the connector’s configuration if provided, or the value of the <code>offsets.storage.topic</code> property in the worker’s configuration if not.</td> + <td></td> + </tr> + <tr> + <td>Read</td> + <td>Topic</td> + <td>Offsets topic used by the connector, which is either the value of the <code>offsets.storage.topic</code> property in the connector’s configuration if provided, or the value of the <code>offsets.storage.topic</code> property in the worker’s configuration if not.</td> + <td></td> + </tr> + <tr> + <td>Describe</td> + <td>Topic</td> + <td>Offsets topic used by the connector, which is either the value of the <code>offsets.storage.topic</code> property in the connector’s configuration if provided, or the value of the <code>offsets.storage.topic</code> property in the worker’s configuration if not.</td> + <td></td> + </tr> + <tr> + <td>Create</td> + <td>Topic</td> + <td>Offsets topic used by the connector, which is either the value of the <code>offsets.storage.topic</code> property in the connector’s configuration if provided, or the value of the <code>offsets.storage.topic</code> property in the worker’s configuration if not.</td> + <td>Only necessary if the offsets topic for the connector does not exist yet</td> + </tr> + <tr> + <td>IdempotentWrite</td> + <td>Cluster</td> + <td>ID of the Kafka cluster that the source connector writes to</td> + <td>The IdempotentWrite ACL has been deprecated as of 2.8 and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters</td> + </tr> + </tbody> + </table> + <h3><a id="connect_development" href="#connect_development">8.3 Connector Development Guide</a></h3> <p>This guide describes how developers can write new connectors for Kafka Connect to move data between Kafka and other systems. It briefly reviews a few key concepts and then describes how to create a simple connector.</p> @@ -593,6 +701,109 @@ if (offset != null) { <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p> + <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5> + + <h6>Supporting exactly-once</h6> + + <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets with [...] + + <h6>Defining transaction boundaries</h6> + + <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p> + + <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p> + + <p>For example, to commit a transaction at least every ten records:</p> + +<pre class="brush: java;"> +private int recordsSent; + +@Override +public void start(Map<String, String> props) { + this.recordsSent = 0; +} + +@Override +public List<SourceRecord> poll() { + List<SourceRecord> records = fetchRecords(); + boolean shouldCommit = false; + for (SourceRecord record : records) { + if (++this.recordsSent >= 10) { + shouldCommit = true; + } + } + if (shouldCommit) { + this.recordsSent = 0; + this.context.transactionContext().commitTransaction(); + } + return records; +} +</pre> + + <p>Or to commit a transaction for exactly every tenth record:</p> + + <pre class="brush: java;"> +private int recordsSent; + +@Override +public void start(Map<String, String> props) { + this.recordsSent = 0; +} + +@Override +public List<SourceRecord> poll() { + List<SourceRecord> records = fetchRecords(); + for (SourceRecord record : records) { + if (++this.recordsSent % 10 == 0) { + this.context.transactionContext().commitTransaction(record); + } + } + return records; +} +</pre> + + <p>Most connectors do not need to define their own transaction boundaries. However, it may be useful if files or objects in the source system are broken up into multiple source records, but should be delivered atomically. Additionally, it may be useful if it is impossible to give each source record a unique source offset, if every record with a given offset is delivered within a single transaction.</p> + + <p>Note that if the user has not enabled connector-defined transaction boundaries in the connector configuration, the <code>TransactionContext</code> returned by <code>context.transactionContext()</code> will be <code>null</code>.</p> + + <h6>Validation APIs</h6> + + <p>A few additional preflight validation APIs can be implemented by source connector developers.</p> + + <p>Some users may require exactly-once delivery guarantees from a connector. In this case, they may set the <code>exactly.once.support</code> property to <code>required</code> in the configuration for the connector. When this happens, the Kafka Connect framework will ask the connector whether it can provide exactly-once delivery guarantees with the specified configuration. This is done by invoking the <code>exactlyOnceSupport</code> method on the connector.</p> + + <p>If a connector doesn't support exactly-once delivery, it should still implement this method to let users know for certain that it cannot provide exactly-once delivery guarantees:</p> + +<pre class="brush: java;"> +@Override +public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { + // This connector cannot provide exactly-once delivery guarantees under any conditions + return ExactlyOnceSupport.UNSUPPORTED; +} +</pre> + + <p>Otherwise, a connector should examine the configuration, and return <code>ExactlyOnceSupport.SUPPORTED</code> if it can provide exactly-once delivery guarantees:</p> + +<pre class="brush: java;"> +@Override +public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { + // This connector can always provide exactly-once delivery guarantees + return ExactlyOnceSupport.SUPPORTED; +} +</pre> + + <p>Additionally, if the user has configured the connector to define its own transaction boundaries, the Kafka Connect framework will ask the connector whether it can define its own transaction boundaries with the specified configuration, using the <code>canDefineTransactionBoundaries</code> method:</p> + +<pre class="brush: java;"> +@Override +public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> props) { + // This connector can always define its own transaction boundaries + return ConnectorTransactionBoundaries.SUPPORTED; +} +</pre> + + <p>This method should only be implemented for connectors that can define their own transaction boundaries in some cases. If a connector is never able to define its own transaction boundaries, it does not need to implement this method.</p> + <h4><a id="connect_dynamicio" href="#connect_dynamicio">Dynamic Input/Output Streams</a></h4> <p>Kafka Connect is intended to define bulk data copying jobs, such as copying an entire database rather than creating many jobs to copy each table individually. One consequence of this design is that the set of input or output streams for a connector can vary over time.</p>