This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7268284699f KAFKA-10000: Add all public-facing config properties
(#11775)
7268284699f is described below
commit 7268284699f84c7f61f5656167c36877b72d27f2
Author: Chris Egerton <[email protected]>
AuthorDate: Thu May 12 02:45:53 2022 -0400
KAFKA-10000: Add all public-facing config properties (#11775)
Reviewers: Luke Chen <[email protected]>, Tom Bentley
<[email protected]>, Andrew Eugene Choi <[email protected]>
---
.../kafka/connect/source/SourceConnector.java | 5 +-
.../apache/kafka/connect/source/SourceTask.java | 2 +
.../connect/runtime/SourceConnectorConfig.java | 168 +++++++++++++++++++--
.../apache/kafka/connect/runtime/WorkerConfig.java | 60 +++++++-
.../runtime/distributed/DistributedConfig.java | 81 ++++++++++
.../kafka/connect/runtime/AbstractHerderTest.java | 34 +++--
.../runtime/distributed/DistributedConfigTest.java | 43 +++++-
7 files changed, 365 insertions(+), 28 deletions(-)
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
index 7fc2a5d11cf..3bd012f9fbc 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
@@ -63,9 +63,8 @@ public abstract class SourceConnector extends Connector {
*
* @param connectorConfig the configuration that will be used for the
connector
* @return {@link ConnectorTransactionBoundaries#SUPPORTED} if the
connector will define its own transaction boundaries,
- * or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise. If
this method is overridden by a
- * connector, should not be {@code null}, but if {@code null}, it will be
assumed that the connector cannot define its own
- * transaction boundaries.
+ * or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise; may
never be {@code null}. The default implementation
+ * returns {@link ConnectorTransactionBoundaries#UNSUPPORTED}.
* @since 3.3
* @see TransactionContext
*/
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index 2159e68e8a8..559f02340ca 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.connector.Task;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
/**
* SourceTask is a Task that pulls records from another system for storage in
Kafka.
@@ -64,6 +65,7 @@ public abstract class SourceTask implements Task {
* @throws IllegalArgumentException if there is no transaction
boundary type with the given name
*/
public static TransactionBoundary fromProperty(String property) {
+ Objects.requireNonNull(property, "Value for transaction boundary
property may not be null");
return
TransactionBoundary.valueOf(property.toUpperCase(Locale.ROOT).trim());
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index e38072b9b6e..65d376b08f7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -20,21 +20,31 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.source.SourceTask;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED;
+import static
org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
+import static
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR;
+import static
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.DEFAULT;
+import static
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL;
+import static
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL;
+import static org.apache.kafka.common.utils.Utils.enumOptions;
public class SourceConnectorConfig extends ConnectorConfig {
@@ -47,6 +57,57 @@ public class SourceConnectorConfig extends ConnectorConfig {
+ "created by source connectors";
private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic
Creation Groups";
+ protected static final String EXACTLY_ONCE_SUPPORT_GROUP = "Exactly Once
Support";
+
+ public enum ExactlyOnceSupportLevel {
+ REQUESTED,
+ REQUIRED;
+
+ public static ExactlyOnceSupportLevel fromProperty(String property) {
+ return valueOf(property.toUpperCase(Locale.ROOT).trim());
+ }
+
+ @Override
+ public String toString() {
+ return name().toLowerCase(Locale.ROOT);
+ }
+ }
+
+ public static final String EXACTLY_ONCE_SUPPORT_CONFIG =
"exactly.once.support";
+ private static final String EXACTLY_ONCE_SUPPORT_DOC = "Permitted values
are " + String.join(", ", enumOptions(ExactlyOnceSupportLevel.class)) + ". "
+ + "If set to \"" + REQUIRED + "\", forces a preflight check for
the connector to ensure that it can provide exactly-once delivery "
+ + "with the given configuration. Some connectors may be capable of
providing exactly-once delivery but not signal to "
+ + "Connect that they support this; in that case, documentation for
the connector should be consulted carefully before "
+ + "creating it, and the value for this property should be set to
\"" + REQUESTED + "\". "
+ + "Additionally, if the value is set to \"" + REQUIRED + "\" but
the worker that performs preflight validation does not have "
+ + "exactly-once support enabled for source connectors, requests to
create or validate the connector will fail.";
+ private static final String EXACTLY_ONCE_SUPPORT_DISPLAY = "Exactly once
support";
+
+ public static final String TRANSACTION_BOUNDARY_CONFIG =
SourceTask.TRANSACTION_BOUNDARY_CONFIG;
+ private static final String TRANSACTION_BOUNDARY_DOC = "Permitted values
are: " + String.join(", ", enumOptions(TransactionBoundary.class)) + ". "
+ + "If set to '" + POLL + "', a new producer transaction will be
started and committed for every batch of records that each task from "
+ + "this connector provides to Connect. If set to '" + CONNECTOR +
"', relies on connector-defined transaction boundaries; note that "
+ + "not all connectors are capable of defining their own
transaction boundaries, and in that case, attempts to instantiate a connector
with "
+ + "this value will fail. Finally, if set to '" + INTERVAL + "',
commits transactions only after a user-defined time interval has passed.";
+ private static final String TRANSACTION_BOUNDARY_DISPLAY = "Transaction
Boundary";
+
+ public static final String TRANSACTION_BOUNDARY_INTERVAL_CONFIG =
"transaction.boundary.interval.ms";
+ private static final String TRANSACTION_BOUNDARY_INTERVAL_DOC = "If '" +
TRANSACTION_BOUNDARY_CONFIG + "' is set to '" + INTERVAL
+ + "', determines the interval for producer transaction commits by
connector tasks. If unset, defaults to the value of the worker-level "
+ + "'" + WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG + "'
property. It has no effect if a different "
+ + TRANSACTION_BOUNDARY_CONFIG + " is specified.";
+ private static final String TRANSACTION_BOUNDARY_INTERVAL_DISPLAY =
"Transaction boundary interval";
+
+ protected static final String OFFSETS_TOPIC_GROUP = "offsets.topic";
+
+ public static final String OFFSETS_TOPIC_CONFIG = "offsets.storage.topic";
+ private static final String OFFSETS_TOPIC_DOC = "The name of a separate
offsets topic to use for this connector. "
+ + "If empty or not specified, the worker’s global offsets topic
name will be used. "
+ + "If specified, the offsets topic will be created if it does not
already exist on the Kafka cluster targeted by this connector "
+ + "(which may be different from the one used for the worker's
global offsets topic if the bootstrap.servers property of the connector's
producer "
+ + "has been overridden from the worker's). Only applicable in
distributed mode; in standalone mode, setting this property will have no
effect.";
+ private static final String OFFSETS_TOPIC_DISPLAY = "Offsets topic";
+
private static class EnrichedSourceConnectorConfig extends ConnectorConfig
{
EnrichedSourceConnectorConfig(Plugins plugins, ConfigDef configDef,
Map<String, String> props) {
super(plugins, configDef, props);
@@ -58,23 +119,87 @@ public class SourceConnectorConfig extends ConnectorConfig
{
}
}
- private static final ConfigDef CONFIG = SourceConnectorConfig.configDef();
+ private final TransactionBoundary transactionBoundary;
+ private final Long transactionBoundaryInterval;
private final EnrichedSourceConnectorConfig enrichedSourceConfig;
+ private final String offsetsTopic;
public static ConfigDef configDef() {
+ ConfigDef.Validator atLeastZero = ConfigDef.Range.atLeast(0);
int orderInGroup = 0;
return new ConfigDef(ConnectorConfig.configDef())
- .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST,
Collections.emptyList(),
- ConfigDef.CompositeValidator.of(new
ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+ .define(
+ TOPIC_CREATION_GROUPS_CONFIG,
+ ConfigDef.Type.LIST,
+ Collections.emptyList(),
+ ConfigDef.CompositeValidator.of(
+ new ConfigDef.NonNullValidator(),
+ ConfigDef.LambdaValidator.with(
+ (name, value) -> {
+ List<?> groupAliases = (List<?>) value;
+ if (groupAliases.size() > new
HashSet<>(groupAliases).size()) {
+ throw new ConfigException(name,
value, "Duplicate alias provided.");
+ }
+ },
+ () -> "unique topic creation groups")),
+ ConfigDef.Importance.LOW,
+ TOPIC_CREATION_GROUPS_DOC,
+ TOPIC_CREATION_GROUP,
+ ++orderInGroup,
+ ConfigDef.Width.LONG,
+ TOPIC_CREATION_GROUPS_DISPLAY)
+ .define(
+ EXACTLY_ONCE_SUPPORT_CONFIG,
+ ConfigDef.Type.STRING,
+ REQUESTED.toString(),
+
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSupportLevel.class)),
+ ConfigDef.Importance.MEDIUM,
+ EXACTLY_ONCE_SUPPORT_DOC,
+ EXACTLY_ONCE_SUPPORT_GROUP,
+ ++orderInGroup,
+ ConfigDef.Width.SHORT,
+ EXACTLY_ONCE_SUPPORT_DISPLAY)
+ .define(
+ TRANSACTION_BOUNDARY_CONFIG,
+ ConfigDef.Type.STRING,
+ DEFAULT.toString(),
+
ConfigDef.CaseInsensitiveValidString.in(enumOptions(TransactionBoundary.class)),
+ ConfigDef.Importance.MEDIUM,
+ TRANSACTION_BOUNDARY_DOC,
+ EXACTLY_ONCE_SUPPORT_GROUP,
+ ++orderInGroup,
+ ConfigDef.Width.SHORT,
+ TRANSACTION_BOUNDARY_DISPLAY)
+ .define(
+ TRANSACTION_BOUNDARY_INTERVAL_CONFIG,
+ ConfigDef.Type.LONG,
+ null,
+ ConfigDef.LambdaValidator.with(
(name, value) -> {
- List<?> groupAliases = (List<?>) value;
- if (groupAliases.size() > new
HashSet<>(groupAliases).size()) {
- throw new ConfigException(name, value,
"Duplicate alias provided.");
+ if (value == null) {
+ return;
}
+ atLeastZero.ensureValid(name, value);
},
- () -> "unique topic creation groups")),
- ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC,
TOPIC_CREATION_GROUP,
- ++orderInGroup, ConfigDef.Width.LONG,
TOPIC_CREATION_GROUPS_DISPLAY);
+ atLeastZero::toString
+ ),
+ ConfigDef.Importance.LOW,
+ TRANSACTION_BOUNDARY_INTERVAL_DOC,
+ EXACTLY_ONCE_SUPPORT_GROUP,
+ ++orderInGroup,
+ ConfigDef.Width.SHORT,
+ TRANSACTION_BOUNDARY_INTERVAL_DISPLAY)
+ .define(
+ OFFSETS_TOPIC_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ new ConfigDef.NonEmptyString(),
+ ConfigDef.Importance.LOW,
+ OFFSETS_TOPIC_DOC,
+ OFFSETS_TOPIC_GROUP,
+ orderInGroup = 1,
+ ConfigDef.Width.LONG,
+ OFFSETS_TOPIC_DISPLAY);
}
public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
@@ -116,9 +241,9 @@ public class SourceConnectorConfig extends ConnectorConfig {
}
public SourceConnectorConfig(Plugins plugins, Map<String, String> props,
boolean createTopics) {
- super(plugins, CONFIG, props);
+ super(plugins, configDef(), props);
if (createTopics && props.entrySet().stream().anyMatch(e ->
e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {
- ConfigDef defaultConfigDef = embedDefaultGroup(CONFIG);
+ ConfigDef defaultConfigDef = embedDefaultGroup(configDef());
// This config is only used to set default values for partitions
and replication
// factor from the default group and otherwise it remains unused
AbstractConfig defaultGroup = new AbstractConfig(defaultConfigDef,
props, false);
@@ -135,6 +260,13 @@ public class SourceConnectorConfig extends ConnectorConfig
{
} else {
enrichedSourceConfig = null;
}
+ transactionBoundary =
TransactionBoundary.fromProperty(getString(TRANSACTION_BOUNDARY_CONFIG));
+ transactionBoundaryInterval =
getLong(TRANSACTION_BOUNDARY_INTERVAL_CONFIG);
+ offsetsTopic = getString(OFFSETS_TOPIC_CONFIG);
+ }
+
+ public static boolean usesTopicCreation(Map<String, String> props) {
+ return props.entrySet().stream().anyMatch(e ->
e.getKey().startsWith(TOPIC_CREATION_PREFIX));
}
@Override
@@ -142,6 +274,18 @@ public class SourceConnectorConfig extends ConnectorConfig
{
return enrichedSourceConfig != null ? enrichedSourceConfig.get(key) :
super.get(key);
}
+ public TransactionBoundary transactionBoundary() {
+ return transactionBoundary;
+ }
+
+ public Long transactionBoundaryInterval() {
+ return transactionBoundaryInterval;
+ }
+
+ public String offsetsTopic() {
+ return offsetsTopic;
+ }
+
/**
* Returns whether this configuration uses topic creation properties.
*
@@ -181,6 +325,6 @@ public class SourceConnectorConfig extends ConnectorConfig {
}
public static void main(String[] args) {
- System.out.println(CONFIG.toHtml(4, config ->
"sourceconnectorconfigs_" + config));
+ System.out.println(configDef().toHtml(4, config ->
"sourceconnectorconfigs_" + config));
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 5894283c935..38dbeb87e1b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -111,7 +111,8 @@ public class WorkerConfig extends AbstractConfig {
private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
= "Maximum number of milliseconds to wait for records to flush and
partition offset data to be"
+ " committed to offset storage before cancelling the process and
restoring the offset "
- + "data to be committed in a future attempt.";
+ + "data to be committed in a future attempt. This property has no
effect for source connectors "
+ + "running with exactly-once support.";
public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
public static final String LISTENERS_CONFIG = "listeners";
@@ -343,6 +344,15 @@ public class WorkerConfig extends AbstractConfig {
}
}
+ /**
+ * @return the {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG
bootstrap servers} property
+ * used by the worker when instantiating Kafka clients for connectors and
tasks (unless overridden)
+ * and its internal topics (if running in distributed mode)
+ */
+ public String bootstrapServers() {
+ return String.join(",", getList(BOOTSTRAP_SERVERS_CONFIG));
+ }
+
public Integer getRebalanceTimeout() {
return null;
}
@@ -351,6 +361,54 @@ public class WorkerConfig extends AbstractConfig {
return getBoolean(TOPIC_CREATION_ENABLE_CONFIG);
}
+ /**
+ * Whether this worker is configured with exactly-once support for source
connectors.
+ * The default implementation returns {@code false} and should be
overridden by subclasses
+ * if the worker mode for the subclass provides exactly-once support for
source connectors.
+ * @return whether exactly-once support is enabled for source connectors
on this worker
+ */
+ public boolean exactlyOnceSourceEnabled() {
+ return false;
+ }
+
+ /**
+ * Get the internal topic used by this worker to store source connector
offsets.
+ * The default implementation returns {@code null} and should be
overridden by subclasses
+ * if the worker mode for the subclass uses an internal offsets topic.
+ * @return the name of the internal offsets topic, or {@code null} if the
worker does not use
+ * an internal offsets topic
+ */
+ public String offsetsTopic() {
+ return null;
+ }
+
+ /**
+ * Determine whether this worker supports per-connector source offsets
topics.
+ * The default implementation returns {@code false} and should be
overridden by subclasses
+ * if the worker mode for the subclass supports per-connector offsets
topics.
+ * @return whether the worker supports per-connector offsets topics
+ */
+ public boolean connectorOffsetsTopicsPermitted() {
+ return false;
+ }
+
+ /**
+ * @return the offset commit interval for tasks created by this worker
+ */
+ public long offsetCommitInterval() {
+ return getLong(OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ }
+
+ /**
+ * Get the {@link CommonClientConfigs#GROUP_ID_CONFIG group ID} used by
this worker to form a cluster.
+ * The default implementation returns {@code null} and should be
overridden by subclasses
+ * if the worker mode for the subclass is capable of forming a cluster
using Kafka's group coordination API.
+ * @return the group ID for the worker's cluster, or {@code null} if the
worker is not capable of forming a cluster.
+ */
+ public String groupId() {
+ return null;
+ }
+
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String,
Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this,
parsedValues);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 23083f5e8e0..849a5969468 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
@@ -32,6 +33,7 @@ import java.security.InvalidParameterException;
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -39,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static org.apache.kafka.common.utils.Utils.enumOptions;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR;
@@ -194,6 +197,34 @@ public class DistributedConfig extends WorkerConfig {
public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A
list of permitted algorithms for verifying internal requests";
public static final List<String>
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT =
Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
+ private enum ExactlyOnceSourceSupport {
+ DISABLED(false),
+ PREPARING(true),
+ ENABLED(true);
+
+ public final boolean usesTransactionalLeader;
+
+ ExactlyOnceSourceSupport(boolean usesTransactionalLeader) {
+ this.usesTransactionalLeader = usesTransactionalLeader;
+ }
+
+ public static ExactlyOnceSourceSupport fromProperty(String property) {
+ return
ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT));
+ }
+
+ @Override
+ public String toString() {
+ return name().toLowerCase(Locale.ROOT);
+ }
+ }
+
+ public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG =
"exactly.once.source.support";
+ public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to
enable exactly-once support for source connectors in the cluster "
+ + "by using transactions to write source records and their source
offsets, and by proactively fencing out old task generations before bringing up
new ones. ";
+ // TODO: https://issues.apache.org/jira/browse/KAFKA-13709
+ // + "See the exactly-once source support documentation at
[add docs link here] for more information on this feature.";
+ public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT =
ExactlyOnceSourceSupport.DISABLED.toString();
+
@SuppressWarnings("unchecked")
private static final ConfigDef CONFIG = baseConfigDef()
.define(GROUP_ID_CONFIG,
@@ -215,6 +246,12 @@ public class DistributedConfig extends WorkerConfig {
Math.toIntExact(TimeUnit.SECONDS.toMillis(3)),
ConfigDef.Importance.HIGH,
HEARTBEAT_INTERVAL_MS_DOC)
+ .define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
+ ConfigDef.Type.STRING,
+ EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT,
+
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSourceSupport.class)),
+ ConfigDef.Importance.HIGH,
+ EXACTLY_ONCE_SOURCE_SUPPORT_DOC)
.define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
ConfigDef.Type.LONG,
TimeUnit.MINUTES.toMillis(5),
@@ -399,13 +436,57 @@ public class DistributedConfig extends WorkerConfig {
ConfigDef.Importance.LOW,
INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
+ private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
+
@Override
public Integer getRebalanceTimeout() {
return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
}
+ @Override
+ public boolean exactlyOnceSourceEnabled() {
+ return exactlyOnceSourceSupport == ExactlyOnceSourceSupport.ENABLED;
+ }
+
+ /**
+ * @return whether the Connect cluster's leader should use a transactional
producer to perform writes to the config
+ * topic, which is useful for ensuring that zombie leaders are fenced out
and unable to write to the topic after a
+ * new leader has been elected.
+ */
+ public boolean transactionalLeaderEnabled() {
+ return exactlyOnceSourceSupport.usesTransactionalLeader;
+ }
+
+ /**
+ * @return the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG transactional
ID} to use for the worker's producer if
+ * using a transactional producer for writes to internal topics such as
the config topic.
+ */
+ public String transactionalProducerId() {
+ return transactionalProducerId(groupId());
+ }
+
+ public static String transactionalProducerId(String groupId) {
+ return "connect-cluster-" + groupId;
+ }
+
+ @Override
+ public String offsetsTopic() {
+ return getString(OFFSET_STORAGE_TOPIC_CONFIG);
+ }
+
+ @Override
+ public boolean connectorOffsetsTopicsPermitted() {
+ return true;
+ }
+
+ @Override
+ public String groupId() {
+ return getString(GROUP_ID_CONFIG);
+ }
+
public DistributedConfig(Map<String, String> props) {
super(CONFIG, props);
+ exactlyOnceSourceSupport =
ExactlyOnceSourceSupport.fromProperty(getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG));
getInternalRequestKeyGenerator(); // Check here for a valid key size +
key algorithm to fail fast if either are invalid
validateKeyAlgorithmAndVerificationAlgorithms();
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 5b9e199e5a1..862f31c0770 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -53,6 +53,7 @@ import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -142,10 +143,10 @@ public class AbstractHerderTest {
@MockStrict private Worker worker;
@MockStrict private WorkerConfigTransformer transformer;
- @MockStrict private Plugins plugins;
- @MockStrict private ClassLoader classLoader;
@MockStrict private ConfigBackingStore configStore;
@MockStrict private StatusBackingStore statusStore;
+ @MockStrict private ClassLoader classLoader;
+ @Mock private Plugins plugins;
@Test
public void testConnectors() {
@@ -436,13 +437,18 @@ public class AbstractHerderTest {
// We expect there to be errors due to the missing name and .... Note
that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to
change rarely.
assertEquals(SampleSourceConnector.class.getName(), result.name());
- assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP,
ConnectorConfig.TRANSFORMS_GROUP,
- ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP), result.groups());
+ assertEquals(
+ Arrays.asList(
+ ConnectorConfig.COMMON_GROUP,
ConnectorConfig.TRANSFORMS_GROUP,
+ ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP,
+ SourceConnectorConfig.TOPIC_CREATION_GROUP,
SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+ SourceConnectorConfig.OFFSETS_TOPIC_GROUP),
+ result.groups());
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(),
Function.identity()));
- // Base connector config has 14 fields, connector's configs add 2
- assertEquals(17, infos.size());
+ // Base connector config has 14 fields, connector's configs add 7
+ assertEquals(21, infos.size());
// Missing name should generate an error
assertEquals(ConnectorConfig.NAME_CONFIG,
infos.get(ConnectorConfig.NAME_CONFIG).configValue().name());
@@ -531,6 +537,8 @@ public class AbstractHerderTest {
ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP,
+ SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+ SourceConnectorConfig.OFFSETS_TOPIC_GROUP,
"Transforms: xformA",
"Transforms: xformB"
);
@@ -538,7 +546,7 @@ public class AbstractHerderTest {
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(),
Function.identity()));
- assertEquals(22, infos.size());
+ assertEquals(26, infos.size());
// Should get 2 type fields from the transforms, first adds its own
config since it has a valid class
assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name());
@@ -590,6 +598,8 @@ public class AbstractHerderTest {
ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP,
+ SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+ SourceConnectorConfig.OFFSETS_TOPIC_GROUP,
"Transforms: xformA",
"Predicates: predX",
"Predicates: predY"
@@ -598,7 +608,7 @@ public class AbstractHerderTest {
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(),
Function.identity()));
- assertEquals(24, infos.size());
+ assertEquals(28, infos.size());
// Should get 2 type fields from the transforms, first adds its own
config since it has a valid class
assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name());
@@ -659,12 +669,14 @@ public class AbstractHerderTest {
ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP,
- SourceConnectorConfig.TOPIC_CREATION_GROUP
+ SourceConnectorConfig.TOPIC_CREATION_GROUP,
+ SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+ SourceConnectorConfig.OFFSETS_TOPIC_GROUP
);
assertEquals(expectedGroups, result.groups());
assertEquals(1, result.errorCount());
- // Base connector config has 14 fields, connector's configs add 2, and
2 producer overrides
- assertEquals(19, result.values().size());
+ // Base connector config has 14 fields, connector's configs add 7, and
2 producer overrides
+ assertEquals(23, result.values().size());
assertTrue(result.values().stream().anyMatch(
configInfo -> ackConfigKey.equals(configInfo.configValue().name())
&& !configInfo.configValue().errors().isEmpty()));
assertTrue(result.values().stream().anyMatch(
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
index 21364ae8e17..12085b21d96 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
@@ -28,11 +28,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
+import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.Assert.assertTrue;
public class DistributedConfigTest {
@@ -306,4 +309,42 @@ public class DistributedConfigTest {
() -> new DistributedConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+
+ @Test
+ public void shouldIdentifyNeedForTransactionalLeader() {
+ Map<String, String> workerProps = configs();
+
+ workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
+ assertFalse(new
DistributedConfig(workerProps).transactionalLeaderEnabled());
+
+ workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ assertTrue(new
DistributedConfig(workerProps).transactionalLeaderEnabled());
+
+ workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+ assertTrue(new
DistributedConfig(workerProps).transactionalLeaderEnabled());
+ }
+
+ @Test
+ public void shouldConstructExpectedTransactionalId() {
+ Map<String, String> workerProps = configs();
+
+ workerProps.put(GROUP_ID_CONFIG, "why did i stay up all night writing
unit tests");
+ assertEquals(
+ "connect-cluster-why did i stay up all night writing unit
tests",
+ new DistributedConfig(workerProps).transactionalProducerId()
+ );
+
+ workerProps.put(GROUP_ID_CONFIG, "connect-cluster");
+ assertEquals(
+ "connect-cluster-connect-cluster",
+ new DistributedConfig(workerProps).transactionalProducerId()
+ );
+
+ workerProps.put(GROUP_ID_CONFIG, "\u2603");
+ assertEquals(
+ "connect-cluster-\u2603",
+ new DistributedConfig(workerProps).transactionalProducerId()
+ );
+ }
+
}