showuon commented on a change in pull request #11775:
URL: https://github.com/apache/kafka/pull/11775#discussion_r819316799



##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return 
{@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value 
such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link 
ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be 
called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be 
run with exactly-once support.
+     *
+     * @param connectorConfig the configuration that will be used for the 
connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can 
provide exactly-once support with the given
+     * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. 
If this method is overridden by a
+     * connector, should not be {@code null}, but if {@code null}, it will be 
assumed that the connector cannot provide
+     * exactly-once guarantees.

Review comment:
       I don't think we need this sentence: `If this method is overridden by a 
connector...`, because we still allow `null` return value, right? I think we 
just need to put the 3 kinds of return value clear here: `supported`, 
`unsupported`, `null`. WDYT?

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return 
{@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value 
such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link 
ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be 
called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be 
run with exactly-once support.
+     *
+     * @param connectorConfig the configuration that will be used for the 
connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can 
provide exactly-once support with the given
+     * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. 
If this method is overridden by a
+     * connector, should not be {@code null}, but if {@code null}, it will be 
assumed that the connector cannot provide
+     * exactly-once guarantees.
+     * @since 3.2
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> 
connectorConfig) {
+        return null;
+    }
+
+    /**
+     * Signals whether the connector implementation is capable of defining the 
transaction boundaries for a
+     * connector with the given configuration. This method is called before 
{@link #start(Map)}, only when the
+     * runtime supports exactly-once and the connector configuration includes 
{@code transaction.boundary=connector}.
+     *
+     * <p>This method need not be implemented if the connector implementation 
does not support defining
+     * transaction boundaries.
+     *
+     * @param connectorConfig the configuration that will be used for the 
connector
+     * @return {@link ConnectorTransactionBoundaries#SUPPORTED} if the 
connector will define its own transaction boundaries,
+     * or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise. If 
this method is overridden by a
+     * connector, should not be {@code null}, but if {@code null}, it will be 
assumed that the connector cannot define its own
+     * transaction boundaries.

Review comment:
       Same as above

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -16,17 +16,63 @@
  */
 package org.apache.kafka.connect.source;
 
-import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.connector.Task;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+    /**
+     * Represents the permitted values for the {@link 
#TRANSACTION_BOUNDARY_CONFIG} property.
+     */
+    public enum TransactionBoundary {
+        /**
+         * A new transaction will be started and committed for every batch of 
records returned by {@link #poll()}.
+         */
+        POLL,
+        /**
+         * Transactions will be started and committed on a user-defined time 
interval.
+         */
+        INTERVAL,
+        /**
+         * Transactions will be defined by the connector itself, via a {@link 
TransactionContext}.
+         */
+        CONNECTOR;
+
+        /**
+         * The default transaction boundary style that will be used for source 
connectors when no style is explicitly
+         * configured.
+         */
+        public static final TransactionBoundary DEFAULT = POLL;
+
+        /**
+         * Parse a {@link TransactionBoundary} from the given string.
+         * @param property the string to parse; should not be null
+         * @return the {@link TransactionBoundary} whose name matches the 
given string
+         * @throws IllegalArgumentException if there is no transaction 
boundary type with the given name
+         */
+        public static TransactionBoundary fromProperty(String property) {
+            return 
TransactionBoundary.valueOf(property.toUpperCase(Locale.ROOT).trim());

Review comment:
       Should we add a null check at the beginning? i.e. 
`Objects.requireNonNull()`

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
##########
@@ -38,4 +38,29 @@
      * Get the OffsetStorageReader for this SourceTask.
      */
     OffsetStorageReader offsetStorageReader();
+
+    /**
+     * Get a {@link TransactionContext} that can be used to define producer 
transaction boundaries
+     * when exactly-once support is enabled for the connector.
+     *
+     * <p>This method was added in Apache Kafka 3.2. Source tasks that use 
this method but want to
+     * maintain backward compatibility so they can also be deployed to older 
Connect runtimes
+     * should guard the call to this method with a try-catch block, since 
calling this method will result in a

Review comment:
       I think the whole paragraph can be put as below to make it more 
readable: 
   
   <p>When the source connector is deployed to Connect runtimes older than 
Kafka 3.2, remember to guard with a try-catch block since it will result in a 
{@link NoSuchMethodException} or {@link NoClassDefFoundError}.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -192,6 +197,33 @@
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A 
list of permitted algorithms for verifying internal requests";
     public static final List<String> 
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = 
Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
 
+    private enum ExactlyOnceSourceSupport {
+        DISABLED(false),
+        PREPARING(true),
+        ENABLED(true);
+
+        public final boolean usesTransactionalLeader;
+
+        ExactlyOnceSourceSupport(boolean usesTransactionalLeader) {
+            this.usesTransactionalLeader = usesTransactionalLeader;
+        }
+
+        public static ExactlyOnceSourceSupport fromProperty(String property) {
+            return 
ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT));
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = 
"exactly.once.source.support";
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to 
enable exactly-once support for source connectors in the cluster "
+            + "by using transactions to write source records and their source 
offsets, and by proactively fencing out old task generations before bringing up 
new ones. "
+            + "See the exactly-once source support documentation at " /* TODO: 
Add docs link */ + " for more information on this feature.";

Review comment:
       Could you first comment out the line: `See the exactly-once source 
support doc...` because we might accidentally release with this empty doc line.
   Also, could you add the jira ticket number to this docs link todo? So that 
we know which doc you're referring to.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -47,6 +57,57 @@
             + "created by source connectors";
     private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic 
Creation Groups";
 
+    protected static final String EXACTLY_ONCE_SUPPORT_GROUP = "Exactly Once 
Support";
+
+    public enum ExactlyOnceSupportLevel {
+        REQUESTED,
+        REQUIRED;
+
+        public static ExactlyOnceSupportLevel fromProperty(String property) {
+            return valueOf(property.toUpperCase(Locale.ROOT).trim());
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    public static final String EXACTLY_ONCE_SUPPORT_CONFIG = 
"exactly.once.support";
+    private static final String EXACTLY_ONCE_SUPPORT_DOC = "Permitted values 
are " + String.join(", ", enumOptions(ExactlyOnceSupportLevel.class)) + ". "
+            + "If set to \"" + REQUIRED + "\", forces a preflight check for 
the connector to ensure that it can provide exactly-once delivery "
+            + "with the given configuration. Some connectors may be capable of 
providing exactly-once delivery but not signal to "
+            + "Connect that they support this; in that case, documentation for 
the connector should be consulted carefully before "
+            + "creating it, and the value for this property should be set to 
\"" + REQUESTED + "\". "
+            + "Additionally, if the value is set to \"" + REQUIRED + "\" but 
the worker that performs preflight validation does not have "
+            + "exactly-once support enabled for source connectors, requests to 
create or validate the connector will fail.";
+    private static final String EXACTLY_ONCE_SUPPORT_DISPLAY = "Exactly once 
support";
+
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
SourceTask.TRANSACTION_BOUNDARY_CONFIG;
+    private static final String TRANSACTION_BOUNDARY_DOC = "Permitted values 
are: " + String.join(", ", enumOptions(TransactionBoundary.class)) + ". "
+            + "If set to '" + POLL + "', a new producer transaction will be 
started and committed for every batch of records that each task from "
+            + "this connector provides to Connect. If set to '" + CONNECTOR + 
"', relies on connector-defined transaction boundaries; note that "
+            + "not all connectors are capable of defining their own 
transaction boundaries, and in that case, attempts to instantiate a connector 
with "
+            + "this value will fail. Finally, if set to '" + INTERVAL + "', 
commits transactions only after a user-defined time interval has passed.";
+    private static final String TRANSACTION_BOUNDARY_DISPLAY = "Transaction 
Boundary";
+
+    public static final String TRANSACTION_BOUNDARY_INTERVAL_CONFIG = 
"transaction.boundary.interval.ms";
+    private static final String TRANSACTION_BOUNDARY_INTERVAL_DOC = "If '" + 
TRANSACTION_BOUNDARY_CONFIG + "' is set to '" + INTERVAL
+            + "', determines the interval for producer transaction commits by 
connector tasks. If unset, defaults to the value of the worker-level "
+            + "'" + WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG + "' 
property. Has no effect if a different "
+            + TRANSACTION_BOUNDARY_CONFIG + " is specified.";

Review comment:
       nit: **It** has no effect if a different TRANSACTION_BOUNDARY_CONFIG is 
specified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to