showuon commented on a change in pull request #11775: URL: https://github.com/apache/kafka/pull/11775#discussion_r819316799
########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ########## @@ -28,4 +30,46 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + + /** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * + * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are + * strongly encouraged to override this method to return a non-null value such as + * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}. + * + * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the + * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support. + * + * @param connectorConfig the configuration that will be used for the connector. + * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given + * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a + * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide + * exactly-once guarantees. Review comment: I don't think we need this sentence: `If this method is overridden by a connector...`, because we still allow `null` return value, right? I think we just need to put the 3 kinds of return value clear here: `supported`, `unsupported`, `null`. WDYT? ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ########## @@ -28,4 +30,46 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + + /** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * + * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are + * strongly encouraged to override this method to return a non-null value such as + * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}. + * + * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the + * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support. + * + * @param connectorConfig the configuration that will be used for the connector. + * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given + * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a + * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide + * exactly-once guarantees. + * @since 3.2 + */ + public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) { + return null; + } + + /** + * Signals whether the connector implementation is capable of defining the transaction boundaries for a + * connector with the given configuration. This method is called before {@link #start(Map)}, only when the + * runtime supports exactly-once and the connector configuration includes {@code transaction.boundary=connector}. + * + * <p>This method need not be implemented if the connector implementation does not support defining + * transaction boundaries. + * + * @param connectorConfig the configuration that will be used for the connector + * @return {@link ConnectorTransactionBoundaries#SUPPORTED} if the connector will define its own transaction boundaries, + * or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise. If this method is overridden by a + * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot define its own + * transaction boundaries. Review comment: Same as above ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -16,17 +16,63 @@ */ package org.apache.kafka.connect.source; -import org.apache.kafka.connect.connector.Task; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.connect.connector.Task; import java.util.List; +import java.util.Locale; import java.util.Map; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + */ + public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary"; + + /** + * Represents the permitted values for the {@link #TRANSACTION_BOUNDARY_CONFIG} property. + */ + public enum TransactionBoundary { + /** + * A new transaction will be started and committed for every batch of records returned by {@link #poll()}. + */ + POLL, + /** + * Transactions will be started and committed on a user-defined time interval. + */ + INTERVAL, + /** + * Transactions will be defined by the connector itself, via a {@link TransactionContext}. + */ + CONNECTOR; + + /** + * The default transaction boundary style that will be used for source connectors when no style is explicitly + * configured. + */ + public static final TransactionBoundary DEFAULT = POLL; + + /** + * Parse a {@link TransactionBoundary} from the given string. + * @param property the string to parse; should not be null + * @return the {@link TransactionBoundary} whose name matches the given string + * @throws IllegalArgumentException if there is no transaction boundary type with the given name + */ + public static TransactionBoundary fromProperty(String property) { + return TransactionBoundary.valueOf(property.toUpperCase(Locale.ROOT).trim()); Review comment: Should we add a null check at the beginning? i.e. `Objects.requireNonNull()` ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java ########## @@ -38,4 +38,29 @@ * Get the OffsetStorageReader for this SourceTask. */ OffsetStorageReader offsetStorageReader(); + + /** + * Get a {@link TransactionContext} that can be used to define producer transaction boundaries + * when exactly-once support is enabled for the connector. + * + * <p>This method was added in Apache Kafka 3.2. Source tasks that use this method but want to + * maintain backward compatibility so they can also be deployed to older Connect runtimes + * should guard the call to this method with a try-catch block, since calling this method will result in a Review comment: I think the whole paragraph can be put as below to make it more readable: <p>When the source connector is deployed to Connect runtimes older than Kafka 3.2, remember to guard with a try-catch block since it will result in a {@link NoSuchMethodException} or {@link NoClassDefFoundError}. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java ########## @@ -192,6 +197,33 @@ public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests"; public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT); + private enum ExactlyOnceSourceSupport { + DISABLED(false), + PREPARING(true), + ENABLED(true); + + public final boolean usesTransactionalLeader; + + ExactlyOnceSourceSupport(boolean usesTransactionalLeader) { + this.usesTransactionalLeader = usesTransactionalLeader; + } + + public static ExactlyOnceSourceSupport fromProperty(String property) { + return ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT)); + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + } + + public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support"; + public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster " + + "by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones. " + + "See the exactly-once source support documentation at " /* TODO: Add docs link */ + " for more information on this feature."; Review comment: Could you first comment out the line: `See the exactly-once source support doc...` because we might accidentally release with this empty doc line. Also, could you add the jira ticket number to this docs link todo? So that we know which doc you're referring to. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java ########## @@ -47,6 +57,57 @@ + "created by source connectors"; private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups"; + protected static final String EXACTLY_ONCE_SUPPORT_GROUP = "Exactly Once Support"; + + public enum ExactlyOnceSupportLevel { + REQUESTED, + REQUIRED; + + public static ExactlyOnceSupportLevel fromProperty(String property) { + return valueOf(property.toUpperCase(Locale.ROOT).trim()); + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + } + + public static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support"; + private static final String EXACTLY_ONCE_SUPPORT_DOC = "Permitted values are " + String.join(", ", enumOptions(ExactlyOnceSupportLevel.class)) + ". " + + "If set to \"" + REQUIRED + "\", forces a preflight check for the connector to ensure that it can provide exactly-once delivery " + + "with the given configuration. Some connectors may be capable of providing exactly-once delivery but not signal to " + + "Connect that they support this; in that case, documentation for the connector should be consulted carefully before " + + "creating it, and the value for this property should be set to \"" + REQUESTED + "\". " + + "Additionally, if the value is set to \"" + REQUIRED + "\" but the worker that performs preflight validation does not have " + + "exactly-once support enabled for source connectors, requests to create or validate the connector will fail."; + private static final String EXACTLY_ONCE_SUPPORT_DISPLAY = "Exactly once support"; + + public static final String TRANSACTION_BOUNDARY_CONFIG = SourceTask.TRANSACTION_BOUNDARY_CONFIG; + private static final String TRANSACTION_BOUNDARY_DOC = "Permitted values are: " + String.join(", ", enumOptions(TransactionBoundary.class)) + ". " + + "If set to '" + POLL + "', a new producer transaction will be started and committed for every batch of records that each task from " + + "this connector provides to Connect. If set to '" + CONNECTOR + "', relies on connector-defined transaction boundaries; note that " + + "not all connectors are capable of defining their own transaction boundaries, and in that case, attempts to instantiate a connector with " + + "this value will fail. Finally, if set to '" + INTERVAL + "', commits transactions only after a user-defined time interval has passed."; + private static final String TRANSACTION_BOUNDARY_DISPLAY = "Transaction Boundary"; + + public static final String TRANSACTION_BOUNDARY_INTERVAL_CONFIG = "transaction.boundary.interval.ms"; + private static final String TRANSACTION_BOUNDARY_INTERVAL_DOC = "If '" + TRANSACTION_BOUNDARY_CONFIG + "' is set to '" + INTERVAL + + "', determines the interval for producer transaction commits by connector tasks. If unset, defaults to the value of the worker-level " + + "'" + WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG + "' property. Has no effect if a different " + + TRANSACTION_BOUNDARY_CONFIG + " is specified."; Review comment: nit: **It** has no effect if a different TRANSACTION_BOUNDARY_CONFIG is specified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org