tombentley commented on a change in pull request #11775:
URL: https://github.com/apache/kafka/pull/11775#discussion_r818522400



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -47,6 +57,57 @@
             + "created by source connectors";
     private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic 
Creation Groups";
 
+    protected static final String EXACTLY_ONCE_SUPPORT_GROUP = "Exactly Once 
Support";
+
+    public enum ExactlyOnceSupportLevel {
+        REQUESTED,
+        REQUIRED;
+
+        public static ExactlyOnceSupportLevel fromProperty(String property) {
+            return valueOf(property.toUpperCase(Locale.ROOT).trim());
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    public static final String EXACTLY_ONCE_SUPPORT_CONFIG = 
"exactly.once.support";
+    private static final String EXACTLY_ONCE_SUPPORT_DOC = "Permitted values 
are " + String.join(", ", enumOptions(ExactlyOnceSupportLevel.class)) + ". "
+            + "If set to \"" + REQUIRED + "\", forces a preflight check for 
the connector to ensure that it can provide exactly-once delivery "
+            + "with the given configuration. Some connectors may be capable of 
providing exactly-once delivery but not signal to "
+            + "Connect that they support this; in that case, documentation for 
the connector should be consulted carefully before "
+            + "creating it, and the value for this property should be set to 
\"" + REQUESTED + "\". "
+            + "Additionally, if the value is set to \"" + REQUIRED + "\" but 
the worker that performs preflight validation does not have "
+            + "exactly-once support enabled for source connectors, requests to 
create or validate the connector will fail.";
+    private static final String EXACTLY_ONCE_SUPPORT_DISPLAY = "Exactly once 
support";
+
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
SourceTask.TRANSACTION_BOUNDARY_CONFIG;
+    private static final String TRANSACTION_BOUNDARY_DOC = "Permitted values 
are: " + String.join(", ", enumOptions(TransactionBoundary.class)) + ". "
+            + "If set to '" + POLL + "', a new producer transaction will be 
started and committed for every batch of records that each task from "
+            + "this connector provides to Connect. If set to '" + CONNECTOR + 
"', relies on connector-defined transaction boundaries; note that "
+            + "not all connectors are capable of defining their own 
transaction boundaries, and in that case, attempts to create them with "

Review comment:
       ```suggestion
               + "not all connectors are capable of defining their own 
transaction boundaries, and in that case, attempts to instantiate a connector 
with "
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -58,23 +119,87 @@ public Object get(String key) {
         }
     }
 
-    private static final ConfigDef CONFIG = SourceConnectorConfig.configDef();
+    private final TransactionBoundary transactionBoundary;
+    private final Long transactionBoundaryInterval;
     private final EnrichedSourceConnectorConfig enrichedSourceConfig;
+    private final String offsetsTopic;
 
     public static ConfigDef configDef() {
+        ConfigDef.Validator atLeastZero = ConfigDef.Range.atLeast(0);
         int orderInGroup = 0;
         return new ConfigDef(ConnectorConfig.configDef())
-                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
-                        ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                .define(
+                        TOPIC_CREATION_GROUPS_CONFIG,
+                        ConfigDef.Type.LIST,
+                        Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(
+                                new ConfigDef.NonNullValidator(),
+                                ConfigDef.LambdaValidator.with(
+                                    (name, value) -> {
+                                        List<?> groupAliases = (List<?>) value;
+                                        if (groupAliases.size() > new 
HashSet<>(groupAliases).size()) {
+                                            throw new ConfigException(name, 
value, "Duplicate alias provided.");
+                                        }
+                                    },
+                                    () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW,
+                        TOPIC_CREATION_GROUPS_DOC,
+                        TOPIC_CREATION_GROUP,
+                        ++orderInGroup,
+                        ConfigDef.Width.LONG,
+                        TOPIC_CREATION_GROUPS_DISPLAY)
+                .define(
+                        EXACTLY_ONCE_SUPPORT_CONFIG,
+                        ConfigDef.Type.STRING,
+                        REQUESTED.toString(),
+                        
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSupportLevel.class)),

Review comment:
       Not for this PR, but looking at this made me realise that ConfigDef 
could benefit from a `Validator` specifically for enums. There's currently 
inconsistency around case sensitivity, for instance. And using `enumOptions` 
more widely would simplify other call sites which typically list all the enum 
members. Wdyt?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -396,13 +441,58 @@
                     ConfigDef.Importance.LOW,
                     INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
 
+    private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
+
     @Override
     public Integer getRebalanceTimeout() {
         return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
     }
 
+    @Override
+    public boolean exactlyOnceSourceEnabled() {
+        return exactlyOnceSourceSupport == ExactlyOnceSourceSupport.ENABLED;
+    }
+
+    /**
+     * @return whether the Connect cluster's leader should use a transactional 
producer to perform writes to the config
+     * topic, which is useful for ensuring that zombie leaders are fenced out 
and unable to write to the topic after a
+     * new leader has been elected.
+     */
+    public boolean transactionalLeaderEnabled() {
+        return exactlyOnceSourceSupport.usesTransactionalLeader;
+    }
+
+    /**
+     * @return the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG transactional 
ID} to use for the worker's producer if
+     * the worker is the leader of the cluster and is
+     * {@link #transactionalLeaderEnabled() configured to use a transactional 
producer}.

Review comment:
       This doc is a little misleading because it returns a value whether or 
not the worker is leader and whether or not transactional leader is enabled. I 
know you mean the method should only be called in those cases, but perhaps 
return null outside of those cases, or alternatively reword the doc?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -192,6 +197,40 @@
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A 
list of permitted algorithms for verifying internal requests";
     public static final List<String> 
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = 
Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
 
+    private enum ExactlyOnceSourceSupport {
+        DISABLED(false),
+        PREPARING(true),
+        ENABLED(true);
+
+        public final boolean usesTransactionalLeader;
+
+        ExactlyOnceSourceSupport(boolean usesTransactionalLeader) {
+            this.usesTransactionalLeader = usesTransactionalLeader;
+        }
+
+        public static ExactlyOnceSourceSupport fromProperty(String property) {
+            return 
ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT));
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = 
"exactly.once.source.support";
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to 
enable exactly-once support for source connectors in the cluster "
+            + "by using transactions to write source records and their source 
offsets, and by proactively fencing out old task generations before bringing up 
new ones. "
+            + "If this feature is enabled, consumers of the topics to which 
exactly-once source connectors write should be configured with the "
+            + ConsumerConfig.ISOLATION_LEVEL_CONFIG + " property set to '" + 
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT) + "'. "
+            + "Note that this must be enabled on every worker in a cluster in 
order for exactly-once delivery to be guaranteed, "
+            + "and that some source connectors may still not be able to 
provide exactly-once delivery guarantees even with this support enabled. "
+            + "Permitted values are \"" + ExactlyOnceSourceSupport.DISABLED + 
"\", \"" + ExactlyOnceSourceSupport.PREPARING + "\", and \"" +  
ExactlyOnceSourceSupport.ENABLED + "\". "
+            + "In order to safely enable exactly-once support for source 
connectors, "
+            + "all workers in the cluster must first be updated to use the 
\"preparing\" value for this property. "
+            + "Once this has been done, a second update of all of the workers 
in the cluster should be performed to change the value of this property to 
\"enabled\".";

Review comment:
       Should this be included here, or should it refer to a dedicated section 
in the connect docs? I guess there's two cases: bootstrapping a whole new 
connect cluster, or upgrading an existing one. For the bootstrapping case it's 
not completely clear whether the "preparing" round is required.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to