rhauch commented on a change in pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#discussion_r429994168



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -678,7 +701,8 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId 
id) {
         if (topic != null && !topic.isEmpty()) {
             Map<String, Object> producerProps = producerConfigs(id, 
"connector-dlq-producer-" + id, config, connConfig, connectorClass,
                                                                 
connectorClientConfigOverridePolicy);
-            Map<String, Object> adminProps = adminConfigs(id, config, 
connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            // Leaving default client id empty means that the admin client 
will set the default at instantiation time
+            Map<String, Object> adminProps = adminConfigs(id, "", config, 
connConfig, connectorClass, connectorClientConfigOverridePolicy);

Review comment:
       Yeah, I think it might help track down problems with admin principals.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, 
Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a 
record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || 
topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. 
Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = 
topicCreation.topicGroups().values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(topicCreation.defaultTopicGroup());
+        log.debug("Topic '{}' matched topic creation group: {}", topic, 
topicGroup);
+        NewTopic newTopic = topicGroup.newTopic(topic);
+
+        if (admin.createTopic(newTopic)) {
+            topicCreation.topicCache().add(topic);

Review comment:
       Another thing for potentially moving into `TopicCreation`, such as maybe 
adding a `addTopic(topic)` method. Doing so might allow you to hide most of the 
implementation details of TopicCreation.
   
   ```suggestion
               topicCreation.addTopic(topic);
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -250,9 +251,17 @@
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    public static final String TOPIC_CREATION_ENABLE_CONFIG = 
"topic.creation.enable";
+    protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, 
it allows "
+            + "source connectors to create topics with custom settings. If 
enabled, in "
+            + "connectors that specify topic creation properties with the 
prefix `" + TOPIC_CREATION_PREFIX
+            + "` each task will use an admin client to create its topics and 
will not depend on "
+            + "auto.create.topics.enable being set on Kafka brokers.";

Review comment:
       Maybe:
   ```suggestion
       protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to 
true, it allows "
               + "source connectors to create topics by specifying topic 
creation properties "
               + "with the prefix `" + TOPIC_CREATION_PREFIX + "`. Each task 
will use an
               + "admin client to create its topics and will not depend on the 
Kafka brokers "
               + "to create topics automatically.";
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, 
connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, 
topicGroups);
+    }
+
+    public static class TopicCreation {
+        private static final TopicCreation EMPTY =
+                new TopicCreation(false, null, Collections.emptyMap(), 
Collections.emptySet());
+
+        private final boolean isTopicCreationEnabled;
+        private final NewTopicCreationGroup defaultTopicGroup;
+        private final Map<String, NewTopicCreationGroup> topicGroups;
+        private final Set<String> topicCache;
+
+        protected TopicCreation(boolean isTopicCreationEnabled,
+                                NewTopicCreationGroup defaultTopicGroup,
+                                Map<String, NewTopicCreationGroup> topicGroups,
+                                Set<String> topicCache) {
+            this.isTopicCreationEnabled = isTopicCreationEnabled;
+            this.defaultTopicGroup = defaultTopicGroup;
+            this.topicGroups = topicGroups;
+            this.topicCache = topicCache;
+        }
+
+        public static TopicCreation newTopicCreation(WorkerConfig workerConfig,
+                                                     Map<String, 
NewTopicCreationGroup> topicGroups) {
+            if (!workerConfig.topicCreationEnable() || topicGroups == null) {
+                return EMPTY;
+            }
+            Map<String, NewTopicCreationGroup> groups = new 
LinkedHashMap<>(topicGroups);
+            groups.remove(DEFAULT_TOPIC_CREATION_GROUP);
+            return new TopicCreation(true, 
topicGroups.get(DEFAULT_TOPIC_CREATION_GROUP), groups, new HashSet<>());
+        }

Review comment:
       Nit: Since this is static, how about moving to before the member fields 
and methods?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, 
Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a 
record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || 
topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. 
Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);

Review comment:
       Another thing for potentially moving into `TopicCreation`, such as maybe 
adding a `addTopic(topic)` method. Doing so might allow you to hide most of the 
implementation details of `TopicCreation`.
   ```suggestion
               topicCreation.addTopic(topic);
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, 
Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a 
record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || 
topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. 
Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = 
topicCreation.topicGroups().values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(topicCreation.defaultTopicGroup());

Review comment:
       Couldn't this also be added to the `TopicCreation` class? Seems like 
it'd help unit test this logic in isolation, more easily covering the corner 
cases.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, 
connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, 
topicGroups);
+    }
+
+    public static class TopicCreation {

Review comment:
       Seems like this could be pulled out into `utils` (?), at which point 
it's also a lot easier to test more thoroughly.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, 
connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, 
topicGroups);
+    }
+
+    public static class TopicCreation {
+        private static final TopicCreation EMPTY =
+                new TopicCreation(false, null, Collections.emptyMap(), 
Collections.emptySet());
+
+        private final boolean isTopicCreationEnabled;
+        private final NewTopicCreationGroup defaultTopicGroup;
+        private final Map<String, NewTopicCreationGroup> topicGroups;
+        private final Set<String> topicCache;
+
+        protected TopicCreation(boolean isTopicCreationEnabled,
+                                NewTopicCreationGroup defaultTopicGroup,
+                                Map<String, NewTopicCreationGroup> topicGroups,
+                                Set<String> topicCache) {
+            this.isTopicCreationEnabled = isTopicCreationEnabled;
+            this.defaultTopicGroup = defaultTopicGroup;
+            this.topicGroups = topicGroups;
+            this.topicCache = topicCache;
+        }
+
+        public static TopicCreation newTopicCreation(WorkerConfig workerConfig,
+                                                     Map<String, 
NewTopicCreationGroup> topicGroups) {
+            if (!workerConfig.topicCreationEnable() || topicGroups == null) {
+                return EMPTY;
+            }
+            Map<String, NewTopicCreationGroup> groups = new 
LinkedHashMap<>(topicGroups);
+            groups.remove(DEFAULT_TOPIC_CREATION_GROUP);
+            return new TopicCreation(true, 
topicGroups.get(DEFAULT_TOPIC_CREATION_GROUP), groups, new HashSet<>());
+        }
+
+        public static TopicCreation empty() {
+            return EMPTY;
+        }

Review comment:
       Nit: Since this is static, how about moving to before the member fields 
and methods?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, 
Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a 
record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || 
topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. 
Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = 
topicCreation.topicGroups().values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(topicCreation.defaultTopicGroup());
+        log.debug("Topic '{}' matched topic creation group: {}", topic, 
topicGroup);
+        NewTopic newTopic = topicGroup.newTopic(topic);
+
+        if (admin.createTopic(newTopic)) {
+            topicCreation.topicCache().add(topic);
+            log.info("Created topic '{}' using creation group {}", newTopic, 
topicGroup);

Review comment:
       `NewTopicCreationGroup` still needs a `toString()` method, since it's 
used in this log statement.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -178,6 +192,76 @@ public NewTopic build() {
         }
     }
 
+    public static class NewTopicCreationGroup {
+        private final String name;
+        private final Pattern inclusionPattern;
+        private final Pattern exclusionPattern;
+        private final int numPartitions;
+        private final short replicationFactor;
+        private final Map<String, Object> otherConfigs;
+
+        protected NewTopicCreationGroup(String group, SourceConnectorConfig 
config) {
+            this.name = group;
+            this.inclusionPattern = Pattern.compile(String.join("|", 
config.topicCreationInclude(group)));
+            this.exclusionPattern = Pattern.compile(String.join("|", 
config.topicCreationExclude(group)));
+            this.numPartitions = config.topicCreationPartitions(group);
+            this.replicationFactor = 
config.topicCreationReplicationFactor(group);
+            this.otherConfigs = config.topicCreationOtherConfigs(group);
+        }
+
+        public String name() {
+            return name;
+        }
+
+        public boolean matches(String topic) {
+            return !exclusionPattern.matcher(topic).matches() && 
inclusionPattern.matcher(topic).matches();
+        }
+
+        public NewTopic newTopic(String topic) {
+            NewTopicBuilder builder = new NewTopicBuilder(topic);
+            return builder.partitions(numPartitions)
+                    .replicationFactor(replicationFactor)
+                    .config(otherConfigs)
+                    .build();
+        }
+
+        public static Map<String, NewTopicCreationGroup> 
configuredGroups(SourceConnectorConfig config) {
+            List<String> groupNames = 
config.getList(TOPIC_CREATION_GROUPS_CONFIG);
+            Map<String, NewTopicCreationGroup> groups = new LinkedHashMap<>();
+            for (String group : groupNames) {
+                groups.put(group, new NewTopicCreationGroup(group, config));
+            }
+            // Even if there was a group called 'default' in the config, it 
will be overriden here.
+            // Order matters for all the topic groups besides the default, 
since it will be
+            // removed from this collection by the Worker
+            groups.put(DEFAULT_TOPIC_CREATION_GROUP, new 
NewTopicCreationGroup(DEFAULT_TOPIC_CREATION_GROUP, config));
+            return groups;
+        }

Review comment:
       Nit: wouldn't this public static method be better before any of the 
member fields or methods?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, 
Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a 
record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || 
topicCreation.topicCache().contains(topic)) {
+            return;
+        }

Review comment:
       Again, this logic could be moved into `TopicCreation` for easier testing 
and to make this code more readable:
   ```suggestion
           if (!topicCreation.isCreateTopicRequired(topic)) {
               return;
           }
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
exclude topics that "
+            + "match their values and refrain from applying this group's 
specific configuration "
+            + "to the topics that match this exclusion list. Note that 
exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";

Review comment:
       This still is unclear, particularly what "match their values" means. 
Maybe:
   ```suggestion
       private static final String INCLUDE_REGEX_DOC = "A list of regular 
expression literals "
               + "used to match the names topics used by the source connector. 
This list is used "
               + "to exclude topics from being created with the topic settings 
defined by this group. "
               + "Note that exclusion rules have precedent and override any 
inclusion rules for the topics.";
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";

Review comment:
       This still is unclear, particularly what "match their values" means. 
Maybe:
   ```suggestion
       private static final String INCLUDE_REGEX_DOC = "A list of regular 
expression literals "
               + "used to match the names topics used by the source connector. 
This list is used "
               + "to include topics that should be created using the topic 
settings defined by this group.";
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
exclude topics that "
+            + "match their values and refrain from applying this group's 
specific configuration "
+            + "to the topics that match this exclusion list. Note that 
exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for new topics "
+            + "created for this connector. This value must not be larger than 
the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be 
thrown when the "
+            + "connector will attempt to create a topic. For the default group 
this configuration"
+            + " is required. For any other group defined in 
topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default 
group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new 
topics created for"
+            + " this connector. For the default group this configuration is 
required. For any "
+            + "other group defined in topic.creation.groups this config is 
optional and if it's "
+            + "missing it gets the value the default group";

Review comment:
       Minor tweaks:
   ```suggestion
       private static final String PARTITIONS_DOC = "The number of partitions 
new topics created for "
               + "this connector. This value may be -1 to use the broker's 
default number of partitions, "
               + "or a positive number representing the desired number of 
partitions. "
               + "For the default group this configuration is required. For any 
"
               + "other group defined in topic.creation.groups this config is 
optional and if it's "
               + "missing it gets the value of the default group";
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
exclude topics that "
+            + "match their values and refrain from applying this group's 
specific configuration "
+            + "to the topics that match this exclusion list. Note that 
exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for new topics "
+            + "created for this connector. This value must not be larger than 
the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be 
thrown when the "
+            + "connector will attempt to create a topic. For the default group 
this configuration"
+            + " is required. For any other group defined in 
topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default 
group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new 
topics created for"
+            + " this connector. For the default group this configuration is 
required. For any "
+            + "other group defined in topic.creation.groups this config is 
optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"

Review comment:
       WDYT?
   ```suggestion
           () -> "Positive number not larger than the number of brokers in the 
Kafka cluster, or -1 to use the broker's default"
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
exclude topics that "
+            + "match their values and refrain from applying this group's 
specific configuration "
+            + "to the topics that match this exclusion list. Note that 
exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for new topics "
+            + "created for this connector. This value must not be larger than 
the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be 
thrown when the "
+            + "connector will attempt to create a topic. For the default group 
this configuration"
+            + " is required. For any other group defined in 
topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default 
group";

Review comment:
       Minor tweaks:
   ```suggestion
       private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for new topics "
               + "created for this connector using this group. This value may 
be -1 to use the broker's"
               + "default replication factor, or may be a positive number not 
larger than the number of "
               + "brokers in the Kafka cluster. A value larger than the number 
of brokers in the Kafka cluster "
               + "will result in an error when the new topic is created. For 
the default group this configuration "
               + "is required. For any other group defined in 
topic.creation.groups this config is "
               + "optional and if it's missing it gets the value of the default 
group";
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -250,9 +251,17 @@
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    public static final String TOPIC_CREATION_ENABLE_CONFIG = 
"topic.creation.enable";
+    protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, 
it allows "
+            + "source connectors to create topics by specifying topic creation 
properties "
+            + "with the prefix `" + TOPIC_CREATION_PREFIX + "`. Each task will 
use an "
+            + "admin client to create its topics and will not depend on the 
Kafka brokers "
+            + "to create topics automatically.";

Review comment:
       The "source connectors to create topics by specifying..." seems strange, 
since source connectors don't actually create topics and even if they did they 
wouldn't do so by specifying anything. Maybe:
   ```suggestion
       protected static final String TOPIC_CREATION_ENABLE_DOC = "Whether to 
allow "
               + "automatic creation of topics used by source connectors, when 
source connector "
               + "are configured with `" + TOPIC_CREATION_PREFIX + "` 
properties. Each task will use an "
               + "admin client to create its topics and will not depend on the 
Kafka brokers "
               + "to create topics automatically.";
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to 
{}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, 
e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: 
topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), 
recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, 
recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        
recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} 
partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), 
recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, 
recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", 
this, producerRecord, e);
+            } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: 
", this, producerRecord, e);
                 toSend = toSend.subList(processed, toSend.size());
                 lastSendFailed = true;
                 counter.retryRemaining();
                 return false;
+            } catch (ConnectException e) {
+                log.warn("{} Failed to send {} with unrecoverable exception: 
", this, producerRecord, e);

Review comment:
       This is a new log line, but it's similar to an existing one above. 
Nevertheless, this will write out the record's key and value to the log. We 
should instead only write the record coordinates.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
exclude topics that "
+            + "match their values and refrain from applying this group's 
specific configuration "
+            + "to the topics that match this exclusion list. Note that 
exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for new topics "
+            + "created for this connector. This value must not be larger than 
the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be 
thrown when the "
+            + "connector will attempt to create a topic. For the default group 
this configuration"
+            + " is required. For any other group defined in 
topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default 
group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new 
topics created for"
+            + " this connector. For the default group this configuration is 
required. For any "
+            + "other group defined in topic.creation.groups this config is 
optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    public static final ConfigDef.Validator PARTITIONS_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validatePartitions(name, (int) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    @SuppressWarnings("unchecked")
+    public static final ConfigDef.Validator REGEX_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> {
+            try {
+                ((List<String>) value).forEach(Pattern::compile);
+            } catch (PatternSyntaxException e) {
+                throw new ConfigException(name, value, "Syntax error in 
regular expression");

Review comment:
       Should we include the `PatternSyntaxException` or its message in the 
`ConfigException`? As it stands, it will be clear *that* the regex is invalid 
but not *why* the regex is invalid.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to 
{}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, 
e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: 
topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), 
recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, 
recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        
recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} 
partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), 
recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, 
recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", 
this, producerRecord, e);
+            } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: 
", this, producerRecord, e);

Review comment:
       I know this line existed before, but this writes out the record's key 
and value to the log. We should instead only write the record coordinates.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
exclude topics that "
+            + "match their values and refrain from applying this group's 
specific configuration "
+            + "to the topics that match this exclusion list. Note that 
exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for new topics "
+            + "created for this connector. This value must not be larger than 
the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be 
thrown when the "
+            + "connector will attempt to create a topic. For the default group 
this configuration"
+            + " is required. For any other group defined in 
topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default 
group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new 
topics created for"
+            + " this connector. For the default group this configuration is 
required. For any "
+            + "other group defined in topic.creation.groups this config is 
optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    public static final ConfigDef.Validator PARTITIONS_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validatePartitions(name, (int) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    @SuppressWarnings("unchecked")
+    public static final ConfigDef.Validator REGEX_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> {
+            try {
+                ((List<String>) value).forEach(Pattern::compile);
+            } catch (PatternSyntaxException e) {
+                throw new ConfigException(name, value, "Syntax error in 
regular expression");
+            }
+        },
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+
+    private static void validatePartitions(String configName, int factor) {
+        if (factor != TopicAdmin.NO_PARTITIONS && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Number of partitions must be positive, or -1 to use the 
broker's default");
+        }
+    }
+
+    private static void validateReplicationFactor(String configName, short 
factor) {
+        if (factor != TopicAdmin.NO_REPLICATION_FACTOR && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Replication factor must be positive, or -1 to use the 
broker's default");
+        }
+    }
+
+    public static ConfigDef configDef(String group, short 
defaultReplicationFactor, int defaultParitionCount) {
+        int orderInGroup = 0;
+        ConfigDef configDef = new ConfigDef();
+        configDef
+                .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                        REGEX_VALIDATOR, ConfigDef.Importance.LOW,
+                        INCLUDE_REGEX_DOC, group, ++orderInGroup, 
ConfigDef.Width.LONG,
+                        "Inclusion Topic Pattern for " + group)
+                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                        REGEX_VALIDATOR, ConfigDef.Importance.LOW,
+                        EXCLUDE_REGEX_DOC, group, ++orderInGroup, 
ConfigDef.Width.LONG,
+                        "Exclusion Topic Pattern for " + group)
+                .define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT,
+                        defaultReplicationFactor, REPLICATION_FACTOR_VALIDATOR,
+                        ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, 
group, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Replication Factor for Topics 
in " + group)
+                .define(PARTITIONS_CONFIG, ConfigDef.Type.INT,
+                        defaultParitionCount, PARTITIONS_VALIDATOR,
+                        ConfigDef.Importance.LOW, PARTITIONS_DOC, group, 
++orderInGroup,
+                        ConfigDef.Width.LONG, "Partition Count for Topics in " 
+ group);
+        return configDef;
+    }
+
+    public static ConfigDef defaultGroupConfigDef() {
+        int orderInGroup = 0;
+        ConfigDef configDef = new ConfigDef();
+        configDef
+                .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, ".*",
+                        new ConfigDef.NonNullValidator(), 
ConfigDef.Importance.LOW,
+                        INCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, 
++orderInGroup, ConfigDef.Width.LONG,
+                        "Inclusion Topic Pattern for " + 
DEFAULT_TOPIC_CREATION_GROUP)
+                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                        new ConfigDef.NonNullValidator(), 
ConfigDef.Importance.LOW,
+                        EXCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, 
++orderInGroup, ConfigDef.Width.LONG,
+                        "Exclusion Topic Pattern for " + 
DEFAULT_TOPIC_CREATION_GROUP)
+                .define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT,
+                        ConfigDef.NO_DEFAULT_VALUE, 
REPLICATION_FACTOR_VALIDATOR,
+                        ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, 
DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Replication Factor for Topics 
in " + DEFAULT_TOPIC_CREATION_GROUP)
+                .define(PARTITIONS_CONFIG, ConfigDef.Type.INT,
+                        ConfigDef.NO_DEFAULT_VALUE, PARTITIONS_VALIDATOR,
+                        ConfigDef.Importance.LOW, PARTITIONS_DOC, 
DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Partition Count for Topics in " 
+ DEFAULT_TOPIC_CREATION_GROUP);
+        return configDef;
+    }

Review comment:
       I'm not sure whether this is a good idea or not, but if 
`configDef(String, ...)` were changed to take `Object` for the 
`defaultReplicationFactor` and `defaultPartitionCount`, then this method could 
be replaced with:
   ```suggestion
       public static ConfigDef defaultGroupConfigDef() {
           return configDef(DEFAULT_TOPIC_CREATION_GROUP, 
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.NO_DEFAULT_VALUE);
       }
   ```
   Even though we'd lose a bit of type safety on the `configDef(...)` method, 
we'd more clearly show how the default is similar to the other rules.
   
   Another alternative to maintain `configDef(...)` type safety is to accept 
`Short` and `Integer`, use `NO_DEFAULT_VALUE` if the parameters are null, and 
then change this method to:
   ```
       public static ConfigDef defaultGroupConfigDef() {
           return configDef(DEFAULT_TOPIC_CREATION_GROUP, null, null);
       }
   ```
   
   Again, not sure it's worth doing this. Up to you.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to 
{}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, 
e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: 
topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), 
recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, 
recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        
recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} 
partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), 
recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, 
recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", 
this, producerRecord, e);
+            } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: 
", this, producerRecord, e);

Review comment:
       It may be worth changing this line in a subsequent PR that can be 
backported.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,157 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = 
TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of 
configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic 
Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> 
props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new 
HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, 
"Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, 
TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, 
TOPIC_CREATION_GROUPS_DISPLAY);
+    }
+
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, 
TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }
+
+    /**
+     * Returns an enriched {@link ConfigDef} building upon the {@code 
ConfigDef}, using the current configuration specified in {@code props} as an 
input.
+     *
+     * @param baseConfigDef the base configuration definition to be enriched
+     * @param props the non parsed configuration properties
+     * @return the enriched configuration definition
+     */
+    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, 
String> props, AbstractConfig defaultGroupConfig) {
+        List<Object> topicCreationGroups = new ArrayList<>();
+        Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, 
props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST);
+        if (aliases instanceof List) {
+            topicCreationGroups.addAll((List<?>) aliases);
+        }
+
+        ConfigDef newDef = new ConfigDef(baseConfigDef);
+        String defaultGroupPrefix = TOPIC_CREATION_PREFIX + 
DEFAULT_TOPIC_CREATION_GROUP + ".";
+        short defaultGroupReplicationFactor = 
defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
+        int defaultGroupPartitions = 
defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG);
+        topicCreationGroups.stream().distinct().forEach(group -> {
+            if (!(group instanceof String)) {
+                throw new ConfigException("Item in " + 
TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String");
+            }
+            String alias = (String) group;
+            String prefix = TOPIC_CREATION_PREFIX + alias + ".";
+            String configGroup = TOPIC_CREATION_GROUP + ": " + alias;
+            newDef.embed(prefix, configGroup, 0,
+                    TopicCreationConfig.configDef(configGroup, 
defaultGroupReplicationFactor, defaultGroupPartitions));
+        });
+        return newDef;
+    }
+
+    @Override
+    public Object get(String key) {

Review comment:
       I'm fine with not ordering all public static methods first, but I think 
you'll agree that having this member getter method appearing before the 
constructor does not follow the conventions and patters we use throughout the 
project.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
exclude topics that "
+            + "match their values and refrain from applying this group's 
specific configuration "
+            + "to the topics that match this exclusion list. Note that 
exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for new topics "
+            + "created for this connector. This value must not be larger than 
the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be 
thrown when the "
+            + "connector will attempt to create a topic. For the default group 
this configuration"
+            + " is required. For any other group defined in 
topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default 
group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new 
topics created for"
+            + " this connector. For the default group this configuration is 
required. For any "
+            + "other group defined in topic.creation.groups this config is 
optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    public static final ConfigDef.Validator PARTITIONS_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validatePartitions(name, (int) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    @SuppressWarnings("unchecked")
+    public static final ConfigDef.Validator REGEX_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> {
+            try {
+                ((List<String>) value).forEach(Pattern::compile);
+            } catch (PatternSyntaxException e) {
+                throw new ConfigException(name, value, "Syntax error in 
regular expression");
+            }
+        },
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+
+    private static void validatePartitions(String configName, int factor) {
+        if (factor != TopicAdmin.NO_PARTITIONS && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Number of partitions must be positive, or -1 to use the 
broker's default");
+        }
+    }
+
+    private static void validateReplicationFactor(String configName, short 
factor) {
+        if (factor != TopicAdmin.NO_REPLICATION_FACTOR && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Replication factor must be positive, or -1 to use the 
broker's default");

Review comment:
       How about:
   ```suggestion
                       "Replication factor must be positive and not larger than 
the number of brokers in the Kafka cluster, or -1 to use the broker's default");
   ```

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java
##########
@@ -0,0 +1,1490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.integration.MonitorableSourceConnector;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import 
org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.ThreadedTest;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreation;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@PowerMockIgnore({"javax.management.*",
+                  "org.apache.log4j.*"})
+@RunWith(PowerMockRunner.class)
+public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
+    private static final String TOPIC = "topic";
+    private static final String OTHER_TOPIC = "other-topic";
+    private static final Map<String, byte[]> PARTITION = 
Collections.singletonMap("key", "partition".getBytes());
+    private static final Map<String, Integer> OFFSET = 
Collections.singletonMap("key", 12);
+
+    // Connect-format data
+    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+    private static final Integer KEY = -1;
+    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
+    private static final Long RECORD = 12L;
+    // Serialized data. The actual format of this data doesn't matter -- we 
just want to see that the right version
+    // is used in the right place.
+    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
+    private static final byte[] SERIALIZED_RECORD = 
"converted-record".getBytes();
+
+    private ExecutorService executor = Executors.newSingleThreadExecutor();
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
+    private WorkerConfig config;
+    private SourceConnectorConfig sourceConfig;
+    private Plugins plugins;
+    private MockConnectMetrics metrics;
+    @Mock private SourceTask sourceTask;
+    @Mock private Converter keyConverter;
+    @Mock private Converter valueConverter;
+    @Mock private HeaderConverter headerConverter;
+    @Mock private TransformationChain<SourceRecord> transformationChain;
+    @Mock private KafkaProducer<byte[], byte[]> producer;
+    @Mock private TopicAdmin admin;
+    @Mock private CloseableOffsetStorageReader offsetReader;
+    @Mock private OffsetStorageWriter offsetWriter;
+    @Mock private ClusterConfigState clusterConfigState;
+    private WorkerSourceTask workerTask;
+    @Mock private Future<RecordMetadata> sendFuture;
+    @MockStrict private TaskStatus.Listener statusListener;
+    @Mock private StatusBackingStore statusBackingStore;
+
+    private Capture<org.apache.kafka.clients.producer.Callback> 
producerCallbacks;
+
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+    static {
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
+    }
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
+    private static final List<SourceRecord> RECORDS = Arrays.asList(
+            new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, 
RECORD_SCHEMA, RECORD)
+    );
+
+    // when this test becomes parameterized, this variable will be a test 
parameter
+    public boolean enableTopicCreation = true;
+
+    @Override
+    public void setup() {
+        super.setup();
+        Map<String, String> workerProps = workerProps();
+        plugins = new Plugins(workerProps);
+        config = new StandaloneConfig(workerProps);
+        sourceConfig = new SourceConnectorConfig(plugins, 
sourceConnectorPropsWithGroups(TOPIC), true);
+        producerCallbacks = EasyMock.newCapture();
+        metrics = new MockConnectMetrics();
+    }
+
+    private Map<String, String> workerProps() {
+        Map<String, String> props = new HashMap<>();
+        props.put("key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        props.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        props.put(TOPIC_CREATION_ENABLE_CONFIG, 
String.valueOf(enableTopicCreation));
+        return props;
+    }
+
+    private Map<String, String> sourceConnectorPropsWithGroups(String topic) {
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "foo-connector");
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(1));
+        props.put(TOPIC_CONFIG, topic);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+        props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", 
"bar"));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, 
String.valueOf(1));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, 
String.valueOf(1));
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + 
INCLUDE_REGEX_CONFIG, topic);
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + 
INCLUDE_REGEX_CONFIG, ".*");
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + 
EXCLUDE_REGEX_CONFIG, topic);
+        return props;
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
+    }
+
+    private void createWorkerTask() {
+        createWorkerTask(TargetState.STARTED);
+    }
+
+    private void createWorkerTask(TargetState initialState) {
+        createWorkerTask(initialState, keyConverter, valueConverter, 
headerConverter);
+    }
+
+    private void createWorkerTask(TargetState initialState, Converter 
keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
+        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, 
initialState, keyConverter, valueConverter, headerConverter,
+                transformationChain, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig),
+                offsetReader, offsetWriter, config, clusterConfigState, 
metrics, plugins.delegatingLoader(), Time.SYSTEM,
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, 
statusBackingStore);
+    }
+
+    @Test
+    public void testStartPaused() throws Exception {
+        final CountDownLatch pauseLatch = new CountDownLatch(1);
+
+        createWorkerTask(TargetState.PAUSED);
+
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                pauseLatch.countDown();
+                return null;
+            }
+        });
+
+        expectClose();
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(pauseLatch.await(5, TimeUnit.SECONDS));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPause() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        AtomicInteger count = new AtomicInteger(0);
+        CountDownLatch pollLatch = expectPolls(10, count);
+        // In this test, we don't flush, so nothing goes any further than the 
offset writer
+
+        expectTopicCreation(TOPIC);
+
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+        assertTrue(awaitLatch(pollLatch));
+
+        workerTask.transitionTo(TargetState.PAUSED);
+
+        int priorCount = count.get();
+        Thread.sleep(100);
+
+        // since the transition is observed asynchronously, the count could be 
off by one loop iteration
+        assertTrue(count.get() - priorCount <= 1);
+
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPollsInBackground() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = expectPolls(10);
+        // In this test, we don't flush, so nothing goes any further than the 
offset writer
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(10);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testFailureInPoll() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = new CountDownLatch(1);
+        final RuntimeException exception = new RuntimeException();
+        EasyMock.expect(sourceTask.poll()).andAnswer(new 
IAnswer<List<SourceRecord>>() {
+            @Override
+            public List<SourceRecord> answer() throws Throwable {
+                pollLatch.countDown();
+                throw exception;
+            }
+        });
+
+        statusListener.onFailure(taskId, exception);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPollReturnsNoRecords() throws Exception {
+        // Test that the task handles an empty list of records
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectEmptyPolls(1, new 
AtomicInteger());
+        expectOffsetFlush(true);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        // Test that the task commits properly when prompted
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectPolls(1);
+        expectOffsetFlush(true);
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(1);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitFailure() throws Exception {
+        // Test that the task commits properly when prompted
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectPolls(1);
+        expectOffsetFlush(true);
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(false);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(1);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsConvertsData() throws Exception {
+        createWorkerTask();
+
+        List<SourceRecord> records = new ArrayList<>();
+        // Can just use the same record for key and value
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsPropagatesTimestamp() throws Exception {
+        final Long timestamp = System.currentTimeMillis();
+
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(timestamp, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testSendRecordsCorruptTimestamp() throws Exception {
+        final Long timestamp = -3L;
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(null, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsNoTimestamp() throws Exception {
+        final Long timestamp = -1L;
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(null, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsRetries() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion 
expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        // First round
+        expectSendRecordOnce(false);
+        // Any Producer retriable exception should work here
+        expectSendRecordSyncFailure(new 
org.apache.kafka.common.errors.TimeoutException("retriable sync failure"));
+
+        // Second round
+        expectSendRecordOnce(true);
+        expectSendRecordOnce(false);
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record2, record3), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testSendRecordsProducerCallbackFail() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        expectSendRecordProducerCallbackFail();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testSendRecordsProducerSendFailsImmediately() throws Exception 
{
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        expectTopicCreation(TOPIC);
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject()))
+                .andThrow(new KafkaException("Producer closed while send in 
progress", new InvalidTopicException(TOPIC)));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test
+    public void testSendRecordsTaskCommitRecordFail() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion 
expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        // Source task commit record failure will not cause the task to abort
+        expectSendRecordOnce(false);
+        expectSendRecordTaskCommitRecordFail(false, false);
+        expectSendRecordOnce(false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSlowTaskStart() throws Exception {
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+        final CountDownLatch finishStartupLatch = new CountDownLatch(1);
+
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                startupLatch.countDown();
+                assertTrue(awaitLatch(finishStartupLatch));
+                return null;
+            }
+        });
+
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> workerTaskFuture = executor.submit(workerTask);
+
+        // Stopping immediately while the other thread has work to do should 
result in no polling, no offset commits,
+        // exiting the work thread immediately, and the stop() method will be 
invoked in the background thread since it
+        // cannot be invoked immediately in the thread trying to stop the task.
+        assertTrue(awaitLatch(startupLatch));
+        workerTask.stop();
+        finishStartupLatch.countDown();
+        assertTrue(workerTask.awaitStop(1000));
+
+        workerTaskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCancel() {
+        createWorkerTask();
+
+        offsetReader.close();
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.cancel();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testMetricsGroup() {
+        SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, 
metrics);
+        SourceTaskMetricsGroup group1 = new SourceTaskMetricsGroup(taskId1, 
metrics);
+        for (int i = 0; i != 10; ++i) {
+            group.recordPoll(100, 1000 + i * 100);
+            group.recordWrite(10);
+        }
+        for (int i = 0; i != 20; ++i) {
+            group1.recordPoll(100, 1000 + i * 100);
+            group1.recordWrite(10);
+        }
+        assertEquals(1900.0, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1450.0, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(33.333, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-poll-rate"), 0.001d);
+        assertEquals(1000, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-poll-total"), 0.001d);
+        assertEquals(3.3333, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-write-rate"), 0.001d);
+        assertEquals(100, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-write-total"), 0.001d);
+        assertEquals(900.0, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-active-count"), 0.001d);
+
+        // Close the group
+        group.close();
+
+        for (MetricName metricName : 
group.metricGroup().metrics().metrics().keySet()) {
+            // Metrics for this group should no longer exist
+            assertFalse(group.metricGroup().groupId().includes(metricName));
+        }
+        // Sensors for this group should no longer exist
+        
assertNull(group.metricGroup().metrics().getSensor("sink-record-read"));
+        
assertNull(group.metricGroup().metrics().getSensor("sink-record-send"));
+        
assertNull(group.metricGroup().metrics().getSensor("sink-record-active-count"));
+        assertNull(group.metricGroup().metrics().getSensor("partition-count"));
+        
assertNull(group.metricGroup().metrics().getSensor("offset-seq-number"));
+        
assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion"));
+        
assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion-skip"));
+        assertNull(group.metricGroup().metrics().getSensor("put-batch-time"));
+
+        assertEquals(2900.0, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1950.0, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(66.667, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-poll-rate"), 0.001d);
+        assertEquals(2000, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-poll-total"), 0.001d);
+        assertEquals(6.667, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-write-rate"), 0.001d);
+        assertEquals(200, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-write-total"), 0.001d);
+        assertEquals(1800.0, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-active-count"), 0.001d);
+    }
+
+    @Test
+    public void testHeaders() throws Exception {
+        Headers headers = new RecordHeaders();
+        headers.add("header_key", "header_value".getBytes());
+
+        org.apache.kafka.connect.header.Headers connectHeaders = new 
ConnectHeaders();
+        connectHeaders.add("header_key", new 
SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
+
+        createWorkerTask();
+
+        List<SourceRecord> records = new ArrayList<>();
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders));
+
+        expectTopicCreation(TOPIC);
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, 
true, false, true, true, true, headers);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+        assertEquals(headers, sent.getValue().headers());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testHeadersWithCustomConverter() throws Exception {
+        StringConverter stringConverter = new StringConverter();
+        TestConverterWithHeaders testConverter = new 
TestConverterWithHeaders();
+
+        createWorkerTask(TargetState.STARTED, stringConverter, testConverter, 
stringConverter);
+
+        List<SourceRecord> records = new ArrayList<>();
+
+        String stringA = "Árvíztűrő tükörfúrógép";
+        org.apache.kafka.connect.header.Headers headersA = new 
ConnectHeaders();
+        String encodingA = "latin2";
+        headersA.addString("encoding", encodingA);
+
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, 
Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, stringA, null, headersA));
+
+        String stringB = "Тестовое сообщение";
+        org.apache.kafka.connect.header.Headers headersB = new 
ConnectHeaders();
+        String encodingB = "koi8_r";
+        headersB.addString("encoding", encodingB);
+
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, 
Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, stringB, null, headersB));
+
+        expectTopicCreation(TOPIC);
+
+        Capture<ProducerRecord<byte[], byte[]>> sentRecordA = 
expectSendRecord(TOPIC, false, false, true, true, false, null);
+        Capture<ProducerRecord<byte[], byte[]>> sentRecordB = 
expectSendRecord(TOPIC, false, false, true, true, false, null);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+
+        assertEquals(ByteBuffer.wrap("a".getBytes()), 
ByteBuffer.wrap(sentRecordA.getValue().key()));
+        assertEquals(
+            ByteBuffer.wrap(stringA.getBytes(encodingA)),
+            ByteBuffer.wrap(sentRecordA.getValue().value())
+        );
+        assertEquals(encodingA, new 
String(sentRecordA.getValue().headers().lastHeader("encoding").value()));
+
+        assertEquals(ByteBuffer.wrap("b".getBytes()), 
ByteBuffer.wrap(sentRecordB.getValue().key()));
+        assertEquals(
+            ByteBuffer.wrap(stringB.getBytes(encodingB)),
+            ByteBuffer.wrap(sentRecordB.getValue().value())
+        );
+        assertEquals(encodingB, new 
String(sentRecordB.getValue().headers().lastHeader("encoding").value()));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTopicCreateWhenTopicExists() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+        TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC,
 topicDesc));
+
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test
+    public void testSendRecordsTopicDescribeRetries() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        // First round - call to describe the topic times out
+        EasyMock.expect(admin.describeTopics(TOPIC))
+                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
+
+        // Second round - calls to describe and create succeed
+        expectTopicCreation(TOPIC);
+        // Exactly two records are sent
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record1, record2), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+    }
+
+    @Test
+    public void testSendRecordsTopicCreateRetries() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First call to describe the topic times out
+        expectPreliminaryCalls();
+        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record1, record2), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+    }
+
+    @Test
+    public void testSendRecordsTopicDescribeRetriesMidway() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion 
expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, 
OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First round
+        expectPreliminaryCalls(OTHER_TOPIC);
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        // First call to describe the topic times out
+        EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
+                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(OTHER_TOPIC);
+        expectSendRecord(OTHER_TOPIC, false, true, true, true, true, 
emptyHeaders());
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record3), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsTopicCreateRetriesMidway() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion 
expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, 
OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First round
+        expectPreliminaryCalls(OTHER_TOPIC);
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        
EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
+        // First call to create the topic times out
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(OTHER_TOPIC);
+        expectSendRecord(OTHER_TOPIC, false, true, true, true, true, 
emptyHeaders());
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record3), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicDescribeFails() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC))
+                .andThrow(new ConnectException(new 
TopicAuthorizationException("unauthorized")));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicCreateFails() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new ConnectException(new 
TopicAuthorizationException("unauthorized")));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertNotNull(newTopicCapture.getValue());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicCreateFailsWithExceptionWhenCreateReturnsFalse() 
throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertNotNull(newTopicCapture.getValue());
+    }
+
+    @Test
+    public void testTopicCreationClassWhenTopicCreationIsEnabled() {
+        TopicCreationGroup expectedDefaultGroup =
+                
TopicCreationGroup.configuredGroups(sourceConfig).get(DEFAULT_TOPIC_CREATION_GROUP);
+
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config,
+                TopicCreationGroup.configuredGroups(sourceConfig));
+
+        assertTrue(topicCreation.isTopicCreationEnabled());
+        assertTrue(topicCreation.isTopicCreationRequired(TOPIC));
+        assertThat(topicCreation.defaultTopicGroup(), 
is(expectedDefaultGroup));
+        assertEquals(2, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups().keySet(), hasItems("foo", 
"bar"));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }
+
+    @Test
+    public void testTopicCreationClassWhenTopicCreationIsDisabled() {
+        Map<String, String> workerProps = workerProps();
+        workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(false));
+        config = new StandaloneConfig(workerProps);
+
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config,
+                TopicCreationGroup.configuredGroups(sourceConfig));
+
+        assertFalse(topicCreation.isTopicCreationEnabled());
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+        assertNull(topicCreation.defaultTopicGroup());
+        assertEquals(0, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups(), is(Collections.emptyMap()));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }
+
+    @Test
+    public void testEmptyTopicCreationClass() {
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config, 
null);
+
+        assertFalse(topicCreation.isTopicCreationEnabled());
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+        assertNull(topicCreation.defaultTopicGroup());
+        assertEquals(0, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups(), is(Collections.emptyMap()));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }

Review comment:
       This test class is already fairly complex. Can these move to a new 
`TopicCreationTest` class to correspond to the new `TopicCreation` class?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -166,11 +176,14 @@ protected void close() {
                 log.warn("Could not close producer", t);
             }
         }
-        try {
-            transformationChain.close();
-        } catch (Throwable t) {
-            log.warn("Could not close transformation chain", t);
+        if (admin != null) {
+            try {
+                admin.close(Duration.ofSeconds(30));
+            } catch (Throwable t) {
+                log.warn("Failed to close admin client on time", t);
+            }

Review comment:
       Should this have a `finally` block that nulls the `admin` field?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,167 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = 
TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of 
configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic 
Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> 
props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new 
HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, 
"Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, 
TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, 
TOPIC_CREATION_GROUPS_DISPLAY);
     }
 
-    public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, 
TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }
+
+    /**
+     * Returns an enriched {@link ConfigDef} building upon the {@code 
ConfigDef}, using the current configuration specified in {@code props} as an 
input.
+     *
+     * @param baseConfigDef the base configuration definition to be enriched
+     * @param props the non parsed configuration properties
+     * @return the enriched configuration definition
+     */
+    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, 
String> props, AbstractConfig defaultGroupConfig) {
+        List<Object> topicCreationGroups = new ArrayList<>();
+        Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, 
props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST);
+        if (aliases instanceof List) {
+            topicCreationGroups.addAll((List<?>) aliases);
+        }
+
+        ConfigDef newDef = new ConfigDef(baseConfigDef);
+        String defaultGroupPrefix = TOPIC_CREATION_PREFIX + 
DEFAULT_TOPIC_CREATION_GROUP + ".";
+        short defaultGroupReplicationFactor = 
defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
+        int defaultGroupPartitions = 
defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG);
+        topicCreationGroups.stream().distinct().forEach(group -> {
+            if (!(group instanceof String)) {
+                throw new ConfigException("Item in " + 
TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String");
+            }
+            String alias = (String) group;
+            String prefix = TOPIC_CREATION_PREFIX + alias + ".";
+            String configGroup = TOPIC_CREATION_GROUP + ": " + alias;
+            newDef.embed(prefix, configGroup, 0,
+                    TopicCreationConfig.configDef(configGroup, 
defaultGroupReplicationFactor, defaultGroupPartitions));
+        });
+        return newDef;
+    }
+
+    public SourceConnectorConfig(Plugins plugins, Map<String, String> props, 
boolean createTopics) {
         super(plugins, config, props);
+        if (createTopics && props.entrySet().stream().anyMatch(e -> 
e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {
+            ConfigDef defaultConfigDef = embedDefaultGroup(config);
+            // This config is only used to set default values for partitions 
and replication
+            // factor from the default group and otherwise it remains unused
+            AbstractConfig defaultGroup = new AbstractConfig(defaultConfigDef, 
props, false);
+
+            // If the user has added regex of include or exclude patterns in 
the default group,
+            // they should be ignored.

Review comment:
       Nice catch.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of regular 
expression literals "
+            + "used to match the names topics used by the source connector. 
This list is used "
+            + "to include topics that should be created using the topic 
settings defined by this group.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String INCLUDE_REGEX_DOC = "A list of regular 
expression literals "

Review comment:
       This is misnamed, and that was my fault when I made a wrong suggestion 
earlier.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to