C0urante commented on a change in pull request #11775:
URL: https://github.com/apache/kafka/pull/11775#discussion_r819945829



##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -16,17 +16,63 @@
  */
 package org.apache.kafka.connect.source;
 
-import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.connector.Task;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+    /**
+     * Represents the permitted values for the {@link 
#TRANSACTION_BOUNDARY_CONFIG} property.
+     */
+    public enum TransactionBoundary {
+        /**
+         * A new transaction will be started and committed for every batch of 
records returned by {@link #poll()}.
+         */
+        POLL,
+        /**
+         * Transactions will be started and committed on a user-defined time 
interval.
+         */
+        INTERVAL,
+        /**
+         * Transactions will be defined by the connector itself, via a {@link 
TransactionContext}.
+         */
+        CONNECTOR;
+
+        /**
+         * The default transaction boundary style that will be used for source 
connectors when no style is explicitly
+         * configured.
+         */
+        public static final TransactionBoundary DEFAULT = POLL;
+
+        /**
+         * Parse a {@link TransactionBoundary} from the given string.
+         * @param property the string to parse; should not be null
+         * @return the {@link TransactionBoundary} whose name matches the 
given string
+         * @throws IllegalArgumentException if there is no transaction 
boundary type with the given name
+         */
+        public static TransactionBoundary fromProperty(String property) {
+            return 
TransactionBoundary.valueOf(property.toUpperCase(Locale.ROOT).trim());

Review comment:
       Good idea! Unlikely to come up in practice given preflight validations 
around the `transaction.boundary` property but a human-readable error message 
is always preferable to a generic NPE.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to