rhauch commented on a change in pull request #8722: URL: https://github.com/apache/kafka/pull/8722#discussion_r429994168
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -678,7 +701,8 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) { if (topic != null && !topic.isEmpty()) { Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy); - Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy); + // Leaving default client id empty means that the admin client will set the default at instantiation time + Map<String, Object> adminProps = adminConfigs(id, "", config, connConfig, connectorClass, connectorClientConfigOverridePolicy); Review comment: Yeah, I think it might help track down problems with admin principals. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { return true; } + // Due to transformations that may change the destination topic of a record (such as + // RegexRouter) topic creation can not be batched for multiple topics + private void maybeCreateTopic(String topic) { + if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) { + return; + } + log.info("The task will send records to topic '{}' for the first time. Checking " + + "whether topic exists", topic); + Map<String, TopicDescription> existing = admin.describeTopics(topic); + if (!existing.isEmpty()) { + log.info("Topic '{}' already exists.", topic); + topicCreation.topicCache().add(topic); + return; + } + + log.info("Creating topic '{}'", topic); + NewTopicCreationGroup topicGroup = topicCreation.topicGroups().values().stream() + .filter(group -> group.matches(topic)) + .findFirst() + .orElse(topicCreation.defaultTopicGroup()); + log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup); + NewTopic newTopic = topicGroup.newTopic(topic); + + if (admin.createTopic(newTopic)) { + topicCreation.topicCache().add(topic); Review comment: Another thing for potentially moving into `TopicCreation`, such as maybe adding a `addTopic(topic)` method. Doing so might allow you to hide most of the implementation details of TopicCreation. ```suggestion topicCreation.addTopic(topic); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -250,9 +251,17 @@ + "user requests to reset the set of active topics per connector."; protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true; + public static final String TOPIC_CREATION_ENABLE_CONFIG = "topic.creation.enable"; + protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, it allows " + + "source connectors to create topics with custom settings. If enabled, in " + + "connectors that specify topic creation properties with the prefix `" + TOPIC_CREATION_PREFIX + + "` each task will use an admin client to create its topics and will not depend on " + + "auto.create.topics.enable being set on Kafka brokers."; Review comment: Maybe: ```suggestion protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, it allows " + "source connectors to create topics by specifying topic creation properties " + "with the prefix `" + TOPIC_CREATION_PREFIX + "`. Each task will use an + "admin client to create its topics and will not depend on the Kafka brokers " + "to create topics automatically."; ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id, this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); this.producerSendException = new AtomicReference<>(); this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); + this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups); + } + + public static class TopicCreation { + private static final TopicCreation EMPTY = + new TopicCreation(false, null, Collections.emptyMap(), Collections.emptySet()); + + private final boolean isTopicCreationEnabled; + private final NewTopicCreationGroup defaultTopicGroup; + private final Map<String, NewTopicCreationGroup> topicGroups; + private final Set<String> topicCache; + + protected TopicCreation(boolean isTopicCreationEnabled, + NewTopicCreationGroup defaultTopicGroup, + Map<String, NewTopicCreationGroup> topicGroups, + Set<String> topicCache) { + this.isTopicCreationEnabled = isTopicCreationEnabled; + this.defaultTopicGroup = defaultTopicGroup; + this.topicGroups = topicGroups; + this.topicCache = topicCache; + } + + public static TopicCreation newTopicCreation(WorkerConfig workerConfig, + Map<String, NewTopicCreationGroup> topicGroups) { + if (!workerConfig.topicCreationEnable() || topicGroups == null) { + return EMPTY; + } + Map<String, NewTopicCreationGroup> groups = new LinkedHashMap<>(topicGroups); + groups.remove(DEFAULT_TOPIC_CREATION_GROUP); + return new TopicCreation(true, topicGroups.get(DEFAULT_TOPIC_CREATION_GROUP), groups, new HashSet<>()); + } Review comment: Nit: Since this is static, how about moving to before the member fields and methods? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { return true; } + // Due to transformations that may change the destination topic of a record (such as + // RegexRouter) topic creation can not be batched for multiple topics + private void maybeCreateTopic(String topic) { + if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) { + return; + } + log.info("The task will send records to topic '{}' for the first time. Checking " + + "whether topic exists", topic); + Map<String, TopicDescription> existing = admin.describeTopics(topic); + if (!existing.isEmpty()) { + log.info("Topic '{}' already exists.", topic); + topicCreation.topicCache().add(topic); Review comment: Another thing for potentially moving into `TopicCreation`, such as maybe adding a `addTopic(topic)` method. Doing so might allow you to hide most of the implementation details of `TopicCreation`. ```suggestion topicCreation.addTopic(topic); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { return true; } + // Due to transformations that may change the destination topic of a record (such as + // RegexRouter) topic creation can not be batched for multiple topics + private void maybeCreateTopic(String topic) { + if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) { + return; + } + log.info("The task will send records to topic '{}' for the first time. Checking " + + "whether topic exists", topic); + Map<String, TopicDescription> existing = admin.describeTopics(topic); + if (!existing.isEmpty()) { + log.info("Topic '{}' already exists.", topic); + topicCreation.topicCache().add(topic); + return; + } + + log.info("Creating topic '{}'", topic); + NewTopicCreationGroup topicGroup = topicCreation.topicGroups().values().stream() + .filter(group -> group.matches(topic)) + .findFirst() + .orElse(topicCreation.defaultTopicGroup()); Review comment: Couldn't this also be added to the `TopicCreation` class? Seems like it'd help unit test this logic in isolation, more easily covering the corner cases. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id, this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); this.producerSendException = new AtomicReference<>(); this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); + this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups); + } + + public static class TopicCreation { Review comment: Seems like this could be pulled out into `utils` (?), at which point it's also a lot easier to test more thoroughly. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id, this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); this.producerSendException = new AtomicReference<>(); this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); + this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups); + } + + public static class TopicCreation { + private static final TopicCreation EMPTY = + new TopicCreation(false, null, Collections.emptyMap(), Collections.emptySet()); + + private final boolean isTopicCreationEnabled; + private final NewTopicCreationGroup defaultTopicGroup; + private final Map<String, NewTopicCreationGroup> topicGroups; + private final Set<String> topicCache; + + protected TopicCreation(boolean isTopicCreationEnabled, + NewTopicCreationGroup defaultTopicGroup, + Map<String, NewTopicCreationGroup> topicGroups, + Set<String> topicCache) { + this.isTopicCreationEnabled = isTopicCreationEnabled; + this.defaultTopicGroup = defaultTopicGroup; + this.topicGroups = topicGroups; + this.topicCache = topicCache; + } + + public static TopicCreation newTopicCreation(WorkerConfig workerConfig, + Map<String, NewTopicCreationGroup> topicGroups) { + if (!workerConfig.topicCreationEnable() || topicGroups == null) { + return EMPTY; + } + Map<String, NewTopicCreationGroup> groups = new LinkedHashMap<>(topicGroups); + groups.remove(DEFAULT_TOPIC_CREATION_GROUP); + return new TopicCreation(true, topicGroups.get(DEFAULT_TOPIC_CREATION_GROUP), groups, new HashSet<>()); + } + + public static TopicCreation empty() { + return EMPTY; + } Review comment: Nit: Since this is static, how about moving to before the member fields and methods? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { return true; } + // Due to transformations that may change the destination topic of a record (such as + // RegexRouter) topic creation can not be batched for multiple topics + private void maybeCreateTopic(String topic) { + if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) { + return; + } + log.info("The task will send records to topic '{}' for the first time. Checking " + + "whether topic exists", topic); + Map<String, TopicDescription> existing = admin.describeTopics(topic); + if (!existing.isEmpty()) { + log.info("Topic '{}' already exists.", topic); + topicCreation.topicCache().add(topic); + return; + } + + log.info("Creating topic '{}'", topic); + NewTopicCreationGroup topicGroup = topicCreation.topicGroups().values().stream() + .filter(group -> group.matches(topic)) + .findFirst() + .orElse(topicCreation.defaultTopicGroup()); + log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup); + NewTopic newTopic = topicGroup.newTopic(topic); + + if (admin.createTopic(newTopic)) { + topicCreation.topicCache().add(topic); + log.info("Created topic '{}' using creation group {}", newTopic, topicGroup); Review comment: `NewTopicCreationGroup` still needs a `toString()` method, since it's used in this log statement. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java ########## @@ -178,6 +192,76 @@ public NewTopic build() { } } + public static class NewTopicCreationGroup { + private final String name; + private final Pattern inclusionPattern; + private final Pattern exclusionPattern; + private final int numPartitions; + private final short replicationFactor; + private final Map<String, Object> otherConfigs; + + protected NewTopicCreationGroup(String group, SourceConnectorConfig config) { + this.name = group; + this.inclusionPattern = Pattern.compile(String.join("|", config.topicCreationInclude(group))); + this.exclusionPattern = Pattern.compile(String.join("|", config.topicCreationExclude(group))); + this.numPartitions = config.topicCreationPartitions(group); + this.replicationFactor = config.topicCreationReplicationFactor(group); + this.otherConfigs = config.topicCreationOtherConfigs(group); + } + + public String name() { + return name; + } + + public boolean matches(String topic) { + return !exclusionPattern.matcher(topic).matches() && inclusionPattern.matcher(topic).matches(); + } + + public NewTopic newTopic(String topic) { + NewTopicBuilder builder = new NewTopicBuilder(topic); + return builder.partitions(numPartitions) + .replicationFactor(replicationFactor) + .config(otherConfigs) + .build(); + } + + public static Map<String, NewTopicCreationGroup> configuredGroups(SourceConnectorConfig config) { + List<String> groupNames = config.getList(TOPIC_CREATION_GROUPS_CONFIG); + Map<String, NewTopicCreationGroup> groups = new LinkedHashMap<>(); + for (String group : groupNames) { + groups.put(group, new NewTopicCreationGroup(group, config)); + } + // Even if there was a group called 'default' in the config, it will be overriden here. + // Order matters for all the topic groups besides the default, since it will be + // removed from this collection by the Worker + groups.put(DEFAULT_TOPIC_CREATION_GROUP, new NewTopicCreationGroup(DEFAULT_TOPIC_CREATION_GROUP, config)); + return groups; + } Review comment: Nit: wouldn't this public static method be better before any of the member fields or methods? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { return true; } + // Due to transformations that may change the destination topic of a record (such as + // RegexRouter) topic creation can not be batched for multiple topics + private void maybeCreateTopic(String topic) { + if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) { + return; + } Review comment: Again, this logic could be moved into `TopicCreation` for easier testing and to make this code more readable: ```suggestion if (!topicCreation.isCreateTopicRequired(topic)) { return; } ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.util.TopicAdmin; + +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class TopicCreationConfig { + + public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default."; + public static final String DEFAULT_TOPIC_CREATION_GROUP = "default"; + + public static final String INCLUDE_REGEX_CONFIG = "include"; + private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to include topics that " + + "match their values and apply this group's specific configuration to the topics " + + "that match this inclusion list."; + + public static final String EXCLUDE_REGEX_CONFIG = "exclude"; + private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to exclude topics that " + + "match their values and refrain from applying this group's specific configuration " + + "to the topics that match this exclusion list. Note that exclusion rules have " + + "precedent and override any inclusion rules for topics. "; Review comment: This still is unclear, particularly what "match their values" means. Maybe: ```suggestion private static final String INCLUDE_REGEX_DOC = "A list of regular expression literals " + "used to match the names topics used by the source connector. This list is used " + "to exclude topics from being created with the topic settings defined by this group. " + "Note that exclusion rules have precedent and override any inclusion rules for the topics."; ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.util.TopicAdmin; + +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class TopicCreationConfig { + + public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default."; + public static final String DEFAULT_TOPIC_CREATION_GROUP = "default"; + + public static final String INCLUDE_REGEX_CONFIG = "include"; + private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to include topics that " + + "match their values and apply this group's specific configuration to the topics " + + "that match this inclusion list."; Review comment: This still is unclear, particularly what "match their values" means. Maybe: ```suggestion private static final String INCLUDE_REGEX_DOC = "A list of regular expression literals " + "used to match the names topics used by the source connector. This list is used " + "to include topics that should be created using the topic settings defined by this group."; ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.util.TopicAdmin; + +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class TopicCreationConfig { + + public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default."; + public static final String DEFAULT_TOPIC_CREATION_GROUP = "default"; + + public static final String INCLUDE_REGEX_CONFIG = "include"; + private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to include topics that " + + "match their values and apply this group's specific configuration to the topics " + + "that match this inclusion list."; + + public static final String EXCLUDE_REGEX_CONFIG = "exclude"; + private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to exclude topics that " + + "match their values and refrain from applying this group's specific configuration " + + "to the topics that match this exclusion list. Note that exclusion rules have " + + "precedent and override any inclusion rules for topics. "; + + public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; + private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics " + + "created for this connector. This value must not be larger than the number of " + + "brokers in the Kafka cluster, or otherwise an error will be thrown when the " + + "connector will attempt to create a topic. For the default group this configuration" + + " is required. For any other group defined in topic.creation.groups this config is " + + "optional and if it's missing it gets the value the default group"; + + public static final String PARTITIONS_CONFIG = "partitions"; + private static final String PARTITIONS_DOC = "The number of partitions new topics created for" + + " this connector. For the default group this configuration is required. For any " + + "other group defined in topic.creation.groups this config is optional and if it's " + + "missing it gets the value the default group"; Review comment: Minor tweaks: ```suggestion private static final String PARTITIONS_DOC = "The number of partitions new topics created for " + "this connector. This value may be -1 to use the broker's default number of partitions, " + "or a positive number representing the desired number of partitions. " + "For the default group this configuration is required. For any " + "other group defined in topic.creation.groups this config is optional and if it's " + "missing it gets the value of the default group"; ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.util.TopicAdmin; + +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class TopicCreationConfig { + + public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default."; + public static final String DEFAULT_TOPIC_CREATION_GROUP = "default"; + + public static final String INCLUDE_REGEX_CONFIG = "include"; + private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to include topics that " + + "match their values and apply this group's specific configuration to the topics " + + "that match this inclusion list."; + + public static final String EXCLUDE_REGEX_CONFIG = "exclude"; + private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to exclude topics that " + + "match their values and refrain from applying this group's specific configuration " + + "to the topics that match this exclusion list. Note that exclusion rules have " + + "precedent and override any inclusion rules for topics. "; + + public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; + private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics " + + "created for this connector. This value must not be larger than the number of " + + "brokers in the Kafka cluster, or otherwise an error will be thrown when the " + + "connector will attempt to create a topic. For the default group this configuration" + + " is required. For any other group defined in topic.creation.groups this config is " + + "optional and if it's missing it gets the value the default group"; + + public static final String PARTITIONS_CONFIG = "partitions"; + private static final String PARTITIONS_DOC = "The number of partitions new topics created for" + + " this connector. For the default group this configuration is required. For any " + + "other group defined in topic.creation.groups this config is optional and if it's " + + "missing it gets the value the default group"; + + public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> validateReplicationFactor(name, (short) value), + () -> "Positive number, or -1 to use the broker's default" Review comment: WDYT? ```suggestion () -> "Positive number not larger than the number of brokers in the Kafka cluster, or -1 to use the broker's default" ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.util.TopicAdmin; + +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class TopicCreationConfig { + + public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default."; + public static final String DEFAULT_TOPIC_CREATION_GROUP = "default"; + + public static final String INCLUDE_REGEX_CONFIG = "include"; + private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to include topics that " + + "match their values and apply this group's specific configuration to the topics " + + "that match this inclusion list."; + + public static final String EXCLUDE_REGEX_CONFIG = "exclude"; + private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to exclude topics that " + + "match their values and refrain from applying this group's specific configuration " + + "to the topics that match this exclusion list. Note that exclusion rules have " + + "precedent and override any inclusion rules for topics. "; + + public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; + private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics " + + "created for this connector. This value must not be larger than the number of " + + "brokers in the Kafka cluster, or otherwise an error will be thrown when the " + + "connector will attempt to create a topic. For the default group this configuration" + + " is required. For any other group defined in topic.creation.groups this config is " + + "optional and if it's missing it gets the value the default group"; Review comment: Minor tweaks: ```suggestion private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics " + "created for this connector using this group. This value may be -1 to use the broker's" + "default replication factor, or may be a positive number not larger than the number of " + "brokers in the Kafka cluster. A value larger than the number of brokers in the Kafka cluster " + "will result in an error when the new topic is created. For the default group this configuration " + "is required. For any other group defined in topic.creation.groups this config is " + "optional and if it's missing it gets the value of the default group"; ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -250,9 +251,17 @@ + "user requests to reset the set of active topics per connector."; protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true; + public static final String TOPIC_CREATION_ENABLE_CONFIG = "topic.creation.enable"; + protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, it allows " + + "source connectors to create topics by specifying topic creation properties " + + "with the prefix `" + TOPIC_CREATION_PREFIX + "`. Each task will use an " + + "admin client to create its topics and will not depend on the Kafka brokers " + + "to create topics automatically."; Review comment: The "source connectors to create topics by specifying..." seems strange, since source connectors don't actually create topics and even if they did they wouldn't do so by specifying anything. Maybe: ```suggestion protected static final String TOPIC_CREATION_ENABLE_DOC = "Whether to allow " + "automatic creation of topics used by source connectors, when source connector " + "are configured with `" + TOPIC_CREATION_PREFIX + "` properties. Each task will use an " + "admin client to create its topics and will not depend on the Kafka brokers " + "to create topics automatically."; ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -344,37 +357,38 @@ private boolean sendRecords() { } } try { + maybeCreateTopic(record.topic()); final String topic = producerRecord.topic(); producer.send( - producerRecord, - new Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if (e != null) { - log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e); - log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); - producerSendException.compareAndSet(null, e); - } else { - recordSent(producerRecord); - counter.completeRecord(); - log.trace("{} Wrote record successfully: topic {} partition {} offset {}", - WorkerSourceTask.this, - recordMetadata.topic(), recordMetadata.partition(), - recordMetadata.offset()); - commitTaskRecord(preTransformRecord, recordMetadata); - if (isTopicTrackingEnabled) { - recordActiveTopic(producerRecord.topic()); - } - } + producerRecord, + (recordMetadata, e) -> { + if (e != null) { + log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); + log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); + producerSendException.compareAndSet(null, e); + } else { + recordSent(producerRecord); + counter.completeRecord(); + log.trace("{} Wrote record successfully: topic {} partition {} offset {}", + WorkerSourceTask.this, + recordMetadata.topic(), recordMetadata.partition(), + recordMetadata.offset()); + commitTaskRecord(preTransformRecord, recordMetadata); + if (isTopicTrackingEnabled) { + recordActiveTopic(producerRecord.topic()); } - }); + } + }); lastSendFailed = false; - } catch (org.apache.kafka.common.errors.RetriableException e) { - log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e); + } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { + log.warn("{} Failed to send {}, backing off before retrying: ", this, producerRecord, e); toSend = toSend.subList(processed, toSend.size()); lastSendFailed = true; counter.retryRemaining(); return false; + } catch (ConnectException e) { + log.warn("{} Failed to send {} with unrecoverable exception: ", this, producerRecord, e); Review comment: This is a new log line, but it's similar to an existing one above. Nevertheless, this will write out the record's key and value to the log. We should instead only write the record coordinates. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.util.TopicAdmin; + +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class TopicCreationConfig { + + public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default."; + public static final String DEFAULT_TOPIC_CREATION_GROUP = "default"; + + public static final String INCLUDE_REGEX_CONFIG = "include"; + private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to include topics that " + + "match their values and apply this group's specific configuration to the topics " + + "that match this inclusion list."; + + public static final String EXCLUDE_REGEX_CONFIG = "exclude"; + private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to exclude topics that " + + "match their values and refrain from applying this group's specific configuration " + + "to the topics that match this exclusion list. Note that exclusion rules have " + + "precedent and override any inclusion rules for topics. "; + + public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; + private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics " + + "created for this connector. This value must not be larger than the number of " + + "brokers in the Kafka cluster, or otherwise an error will be thrown when the " + + "connector will attempt to create a topic. For the default group this configuration" + + " is required. For any other group defined in topic.creation.groups this config is " + + "optional and if it's missing it gets the value the default group"; + + public static final String PARTITIONS_CONFIG = "partitions"; + private static final String PARTITIONS_DOC = "The number of partitions new topics created for" + + " this connector. For the default group this configuration is required. For any " + + "other group defined in topic.creation.groups this config is optional and if it's " + + "missing it gets the value the default group"; + + public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> validateReplicationFactor(name, (short) value), + () -> "Positive number, or -1 to use the broker's default" + ); + public static final ConfigDef.Validator PARTITIONS_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> validatePartitions(name, (int) value), + () -> "Positive number, or -1 to use the broker's default" + ); + @SuppressWarnings("unchecked") + public static final ConfigDef.Validator REGEX_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> { + try { + ((List<String>) value).forEach(Pattern::compile); + } catch (PatternSyntaxException e) { + throw new ConfigException(name, value, "Syntax error in regular expression"); Review comment: Should we include the `PatternSyntaxException` or its message in the `ConfigException`? As it stands, it will be clear *that* the regex is invalid but not *why* the regex is invalid. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -344,37 +357,38 @@ private boolean sendRecords() { } } try { + maybeCreateTopic(record.topic()); final String topic = producerRecord.topic(); producer.send( - producerRecord, - new Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if (e != null) { - log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e); - log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); - producerSendException.compareAndSet(null, e); - } else { - recordSent(producerRecord); - counter.completeRecord(); - log.trace("{} Wrote record successfully: topic {} partition {} offset {}", - WorkerSourceTask.this, - recordMetadata.topic(), recordMetadata.partition(), - recordMetadata.offset()); - commitTaskRecord(preTransformRecord, recordMetadata); - if (isTopicTrackingEnabled) { - recordActiveTopic(producerRecord.topic()); - } - } + producerRecord, + (recordMetadata, e) -> { + if (e != null) { + log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); + log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); + producerSendException.compareAndSet(null, e); + } else { + recordSent(producerRecord); + counter.completeRecord(); + log.trace("{} Wrote record successfully: topic {} partition {} offset {}", + WorkerSourceTask.this, + recordMetadata.topic(), recordMetadata.partition(), + recordMetadata.offset()); + commitTaskRecord(preTransformRecord, recordMetadata); + if (isTopicTrackingEnabled) { + recordActiveTopic(producerRecord.topic()); } - }); + } + }); lastSendFailed = false; - } catch (org.apache.kafka.common.errors.RetriableException e) { - log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e); + } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { + log.warn("{} Failed to send {}, backing off before retrying: ", this, producerRecord, e); Review comment: I know this line existed before, but this writes out the record's key and value to the log. We should instead only write the record coordinates. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.util.TopicAdmin; + +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class TopicCreationConfig { + + public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default."; + public static final String DEFAULT_TOPIC_CREATION_GROUP = "default"; + + public static final String INCLUDE_REGEX_CONFIG = "include"; + private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to include topics that " + + "match their values and apply this group's specific configuration to the topics " + + "that match this inclusion list."; + + public static final String EXCLUDE_REGEX_CONFIG = "exclude"; + private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to exclude topics that " + + "match their values and refrain from applying this group's specific configuration " + + "to the topics that match this exclusion list. Note that exclusion rules have " + + "precedent and override any inclusion rules for topics. "; + + public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; + private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics " + + "created for this connector. This value must not be larger than the number of " + + "brokers in the Kafka cluster, or otherwise an error will be thrown when the " + + "connector will attempt to create a topic. For the default group this configuration" + + " is required. For any other group defined in topic.creation.groups this config is " + + "optional and if it's missing it gets the value the default group"; + + public static final String PARTITIONS_CONFIG = "partitions"; + private static final String PARTITIONS_DOC = "The number of partitions new topics created for" + + " this connector. For the default group this configuration is required. For any " + + "other group defined in topic.creation.groups this config is optional and if it's " + + "missing it gets the value the default group"; + + public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> validateReplicationFactor(name, (short) value), + () -> "Positive number, or -1 to use the broker's default" + ); + public static final ConfigDef.Validator PARTITIONS_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> validatePartitions(name, (int) value), + () -> "Positive number, or -1 to use the broker's default" + ); + @SuppressWarnings("unchecked") + public static final ConfigDef.Validator REGEX_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> { + try { + ((List<String>) value).forEach(Pattern::compile); + } catch (PatternSyntaxException e) { + throw new ConfigException(name, value, "Syntax error in regular expression"); + } + }, + () -> "Positive number, or -1 to use the broker's default" + ); + + private static void validatePartitions(String configName, int factor) { + if (factor != TopicAdmin.NO_PARTITIONS && factor < 1) { + throw new ConfigException(configName, factor, + "Number of partitions must be positive, or -1 to use the broker's default"); + } + } + + private static void validateReplicationFactor(String configName, short factor) { + if (factor != TopicAdmin.NO_REPLICATION_FACTOR && factor < 1) { + throw new ConfigException(configName, factor, + "Replication factor must be positive, or -1 to use the broker's default"); + } + } + + public static ConfigDef configDef(String group, short defaultReplicationFactor, int defaultParitionCount) { + int orderInGroup = 0; + ConfigDef configDef = new ConfigDef(); + configDef + .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), + REGEX_VALIDATOR, ConfigDef.Importance.LOW, + INCLUDE_REGEX_DOC, group, ++orderInGroup, ConfigDef.Width.LONG, + "Inclusion Topic Pattern for " + group) + .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), + REGEX_VALIDATOR, ConfigDef.Importance.LOW, + EXCLUDE_REGEX_DOC, group, ++orderInGroup, ConfigDef.Width.LONG, + "Exclusion Topic Pattern for " + group) + .define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, + defaultReplicationFactor, REPLICATION_FACTOR_VALIDATOR, + ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, group, ++orderInGroup, + ConfigDef.Width.LONG, "Replication Factor for Topics in " + group) + .define(PARTITIONS_CONFIG, ConfigDef.Type.INT, + defaultParitionCount, PARTITIONS_VALIDATOR, + ConfigDef.Importance.LOW, PARTITIONS_DOC, group, ++orderInGroup, + ConfigDef.Width.LONG, "Partition Count for Topics in " + group); + return configDef; + } + + public static ConfigDef defaultGroupConfigDef() { + int orderInGroup = 0; + ConfigDef configDef = new ConfigDef(); + configDef + .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, ".*", + new ConfigDef.NonNullValidator(), ConfigDef.Importance.LOW, + INCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup, ConfigDef.Width.LONG, + "Inclusion Topic Pattern for " + DEFAULT_TOPIC_CREATION_GROUP) + .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), + new ConfigDef.NonNullValidator(), ConfigDef.Importance.LOW, + EXCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup, ConfigDef.Width.LONG, + "Exclusion Topic Pattern for " + DEFAULT_TOPIC_CREATION_GROUP) + .define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, + ConfigDef.NO_DEFAULT_VALUE, REPLICATION_FACTOR_VALIDATOR, + ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup, + ConfigDef.Width.LONG, "Replication Factor for Topics in " + DEFAULT_TOPIC_CREATION_GROUP) + .define(PARTITIONS_CONFIG, ConfigDef.Type.INT, + ConfigDef.NO_DEFAULT_VALUE, PARTITIONS_VALIDATOR, + ConfigDef.Importance.LOW, PARTITIONS_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup, + ConfigDef.Width.LONG, "Partition Count for Topics in " + DEFAULT_TOPIC_CREATION_GROUP); + return configDef; + } Review comment: I'm not sure whether this is a good idea or not, but if `configDef(String, ...)` were changed to take `Object` for the `defaultReplicationFactor` and `defaultPartitionCount`, then this method could be replaced with: ```suggestion public static ConfigDef defaultGroupConfigDef() { return configDef(DEFAULT_TOPIC_CREATION_GROUP, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.NO_DEFAULT_VALUE); } ``` Even though we'd lose a bit of type safety on the `configDef(...)` method, we'd more clearly show how the default is similar to the other rules. Another alternative to maintain `configDef(...)` type safety is to accept `Short` and `Integer`, use `NO_DEFAULT_VALUE` if the parameters are null, and then change this method to: ``` public static ConfigDef defaultGroupConfigDef() { return configDef(DEFAULT_TOPIC_CREATION_GROUP, null, null); } ``` Again, not sure it's worth doing this. Up to you. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -344,37 +357,38 @@ private boolean sendRecords() { } } try { + maybeCreateTopic(record.topic()); final String topic = producerRecord.topic(); producer.send( - producerRecord, - new Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if (e != null) { - log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e); - log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); - producerSendException.compareAndSet(null, e); - } else { - recordSent(producerRecord); - counter.completeRecord(); - log.trace("{} Wrote record successfully: topic {} partition {} offset {}", - WorkerSourceTask.this, - recordMetadata.topic(), recordMetadata.partition(), - recordMetadata.offset()); - commitTaskRecord(preTransformRecord, recordMetadata); - if (isTopicTrackingEnabled) { - recordActiveTopic(producerRecord.topic()); - } - } + producerRecord, + (recordMetadata, e) -> { + if (e != null) { + log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); + log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); + producerSendException.compareAndSet(null, e); + } else { + recordSent(producerRecord); + counter.completeRecord(); + log.trace("{} Wrote record successfully: topic {} partition {} offset {}", + WorkerSourceTask.this, + recordMetadata.topic(), recordMetadata.partition(), + recordMetadata.offset()); + commitTaskRecord(preTransformRecord, recordMetadata); + if (isTopicTrackingEnabled) { + recordActiveTopic(producerRecord.topic()); } - }); + } + }); lastSendFailed = false; - } catch (org.apache.kafka.common.errors.RetriableException e) { - log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e); + } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { + log.warn("{} Failed to send {}, backing off before retrying: ", this, producerRecord, e); Review comment: It may be worth changing this line in a subsequent PR that can be backported. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java ########## @@ -16,21 +16,157 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.runtime.isolation.Plugins; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; public class SourceConnectorConfig extends ConnectorConfig { - private static ConfigDef config = ConnectorConfig.configDef(); + protected static final String TOPIC_CREATION_GROUP = "Topic Creation"; + + public static final String TOPIC_CREATION_PREFIX = "topic.creation."; + + public static final String TOPIC_CREATION_GROUPS_CONFIG = TOPIC_CREATION_PREFIX + "groups"; + private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of configurations for topics " + + "created by source connectors"; + private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups"; + + private static class EnrichedSourceConnectorConfig extends AbstractConfig { + EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> props) { + super(configDef, props); + } + + @Override + public Object get(String key) { + return super.get(key); + } + } + + private static ConfigDef config = SourceConnectorConfig.configDef(); + private final EnrichedSourceConnectorConfig enrichedSourceConfig; public static ConfigDef configDef() { - return config; + int orderInGroup = 0; + return new ConfigDef(ConnectorConfig.configDef()) + .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), + ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with( + (name, value) -> { + List<?> groupAliases = (List<?>) value; + if (groupAliases.size() > new HashSet<>(groupAliases).size()) { + throw new ConfigException(name, value, "Duplicate alias provided."); + } + }, + () -> "unique topic creation groups")), + ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP, + ++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY); + } + + public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) { + String defaultGroup = "default"; + ConfigDef newDefaultDef = new ConfigDef(baseConfigDef); + newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, TopicCreationConfig.defaultGroupConfigDef()); + return newDefaultDef; + } + + /** + * Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input. + * + * @param baseConfigDef the base configuration definition to be enriched + * @param props the non parsed configuration properties + * @return the enriched configuration definition + */ + public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, AbstractConfig defaultGroupConfig) { + List<Object> topicCreationGroups = new ArrayList<>(); + Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST); + if (aliases instanceof List) { + topicCreationGroups.addAll((List<?>) aliases); + } + + ConfigDef newDef = new ConfigDef(baseConfigDef); + String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + "."; + short defaultGroupReplicationFactor = defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG); + int defaultGroupPartitions = defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG); + topicCreationGroups.stream().distinct().forEach(group -> { + if (!(group instanceof String)) { + throw new ConfigException("Item in " + TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String"); + } + String alias = (String) group; + String prefix = TOPIC_CREATION_PREFIX + alias + "."; + String configGroup = TOPIC_CREATION_GROUP + ": " + alias; + newDef.embed(prefix, configGroup, 0, + TopicCreationConfig.configDef(configGroup, defaultGroupReplicationFactor, defaultGroupPartitions)); + }); + return newDef; + } + + @Override + public Object get(String key) { Review comment: I'm fine with not ordering all public static methods first, but I think you'll agree that having this member getter method appearing before the constructor does not follow the conventions and patters we use throughout the project. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.util.TopicAdmin; + +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class TopicCreationConfig { + + public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default."; + public static final String DEFAULT_TOPIC_CREATION_GROUP = "default"; + + public static final String INCLUDE_REGEX_CONFIG = "include"; + private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to include topics that " + + "match their values and apply this group's specific configuration to the topics " + + "that match this inclusion list."; + + public static final String EXCLUDE_REGEX_CONFIG = "exclude"; + private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular " + + "expressions that may match topic names. This list is used to exclude topics that " + + "match their values and refrain from applying this group's specific configuration " + + "to the topics that match this exclusion list. Note that exclusion rules have " + + "precedent and override any inclusion rules for topics. "; + + public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; + private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics " + + "created for this connector. This value must not be larger than the number of " + + "brokers in the Kafka cluster, or otherwise an error will be thrown when the " + + "connector will attempt to create a topic. For the default group this configuration" + + " is required. For any other group defined in topic.creation.groups this config is " + + "optional and if it's missing it gets the value the default group"; + + public static final String PARTITIONS_CONFIG = "partitions"; + private static final String PARTITIONS_DOC = "The number of partitions new topics created for" + + " this connector. For the default group this configuration is required. For any " + + "other group defined in topic.creation.groups this config is optional and if it's " + + "missing it gets the value the default group"; + + public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> validateReplicationFactor(name, (short) value), + () -> "Positive number, or -1 to use the broker's default" + ); + public static final ConfigDef.Validator PARTITIONS_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> validatePartitions(name, (int) value), + () -> "Positive number, or -1 to use the broker's default" + ); + @SuppressWarnings("unchecked") + public static final ConfigDef.Validator REGEX_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> { + try { + ((List<String>) value).forEach(Pattern::compile); + } catch (PatternSyntaxException e) { + throw new ConfigException(name, value, "Syntax error in regular expression"); + } + }, + () -> "Positive number, or -1 to use the broker's default" + ); + + private static void validatePartitions(String configName, int factor) { + if (factor != TopicAdmin.NO_PARTITIONS && factor < 1) { + throw new ConfigException(configName, factor, + "Number of partitions must be positive, or -1 to use the broker's default"); + } + } + + private static void validateReplicationFactor(String configName, short factor) { + if (factor != TopicAdmin.NO_REPLICATION_FACTOR && factor < 1) { + throw new ConfigException(configName, factor, + "Replication factor must be positive, or -1 to use the broker's default"); Review comment: How about: ```suggestion "Replication factor must be positive and not larger than the number of brokers in the Kafka cluster, or -1 to use the broker's default"); ``` ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java ########## @@ -0,0 +1,1490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.integration.MonitorableSourceConnector; +import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.ThreadedTest; +import org.apache.kafka.connect.util.TopicAdmin; +import org.apache.kafka.connect.util.TopicCreation; +import org.apache.kafka.connect.util.TopicCreationGroup; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.api.easymock.annotation.MockStrict; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@PowerMockIgnore({"javax.management.*", + "org.apache.log4j.*"}) +@RunWith(PowerMockRunner.class) +public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest { + private static final String TOPIC = "topic"; + private static final String OTHER_TOPIC = "other-topic"; + private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes()); + private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12); + + // Connect-format data + private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; + private static final Integer KEY = -1; + private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA; + private static final Long RECORD = 12L; + // Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version + // is used in the right place. + private static final byte[] SERIALIZED_KEY = "converted-key".getBytes(); + private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes(); + + private ExecutorService executor = Executors.newSingleThreadExecutor(); + private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1); + private WorkerConfig config; + private SourceConnectorConfig sourceConfig; + private Plugins plugins; + private MockConnectMetrics metrics; + @Mock private SourceTask sourceTask; + @Mock private Converter keyConverter; + @Mock private Converter valueConverter; + @Mock private HeaderConverter headerConverter; + @Mock private TransformationChain<SourceRecord> transformationChain; + @Mock private KafkaProducer<byte[], byte[]> producer; + @Mock private TopicAdmin admin; + @Mock private CloseableOffsetStorageReader offsetReader; + @Mock private OffsetStorageWriter offsetWriter; + @Mock private ClusterConfigState clusterConfigState; + private WorkerSourceTask workerTask; + @Mock private Future<RecordMetadata> sendFuture; + @MockStrict private TaskStatus.Listener statusListener; + @Mock private StatusBackingStore statusBackingStore; + + private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks; + + private static final Map<String, String> TASK_PROPS = new HashMap<>(); + static { + TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); + } + private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS); + + private static final List<SourceRecord> RECORDS = Arrays.asList( + new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) + ); + + // when this test becomes parameterized, this variable will be a test parameter + public boolean enableTopicCreation = true; + + @Override + public void setup() { + super.setup(); + Map<String, String> workerProps = workerProps(); + plugins = new Plugins(workerProps); + config = new StandaloneConfig(workerProps); + sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorPropsWithGroups(TOPIC), true); + producerCallbacks = EasyMock.newCapture(); + metrics = new MockConnectMetrics(); + } + + private Map<String, String> workerProps() { + Map<String, String> props = new HashMap<>(); + props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + props.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + props.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + props.put("internal.key.converter.schemas.enable", "false"); + props.put("internal.value.converter.schemas.enable", "false"); + props.put("offset.storage.file.filename", "/tmp/connect.offsets"); + props.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation)); + return props; + } + + private Map<String, String> sourceConnectorPropsWithGroups(String topic) { + // setup up props for the source connector + Map<String, String> props = new HashMap<>(); + props.put("name", "foo-connector"); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName()); + props.put(TASKS_MAX_CONFIG, String.valueOf(1)); + props.put(TOPIC_CONFIG, topic); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar")); + props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1)); + props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1)); + props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, topic); + props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + INCLUDE_REGEX_CONFIG, ".*"); + props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + EXCLUDE_REGEX_CONFIG, topic); + return props; + } + + @After + public void tearDown() { + if (metrics != null) metrics.stop(); + } + + private void createWorkerTask() { + createWorkerTask(TargetState.STARTED); + } + + private void createWorkerTask(TargetState initialState) { + createWorkerTask(initialState, keyConverter, valueConverter, headerConverter); + } + + private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { + workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, + transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), + offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, + RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore); + } + + @Test + public void testStartPaused() throws Exception { + final CountDownLatch pauseLatch = new CountDownLatch(1); + + createWorkerTask(TargetState.PAUSED); + + statusListener.onPause(taskId); + EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { + @Override + public Void answer() throws Throwable { + pauseLatch.countDown(); + return null; + } + }); + + expectClose(); + + statusListener.onShutdown(taskId); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future<?> taskFuture = executor.submit(workerTask); + + assertTrue(pauseLatch.await(5, TimeUnit.SECONDS)); + workerTask.stop(); + assertTrue(workerTask.awaitStop(1000)); + + taskFuture.get(); + + PowerMock.verifyAll(); + } + + @Test + public void testPause() throws Exception { + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(TASK_PROPS); + EasyMock.expectLastCall(); + statusListener.onStartup(taskId); + EasyMock.expectLastCall(); + + AtomicInteger count = new AtomicInteger(0); + CountDownLatch pollLatch = expectPolls(10, count); + // In this test, we don't flush, so nothing goes any further than the offset writer + + expectTopicCreation(TOPIC); + + statusListener.onPause(taskId); + EasyMock.expectLastCall(); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + statusListener.onShutdown(taskId); + EasyMock.expectLastCall(); + + expectClose(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future<?> taskFuture = executor.submit(workerTask); + assertTrue(awaitLatch(pollLatch)); + + workerTask.transitionTo(TargetState.PAUSED); + + int priorCount = count.get(); + Thread.sleep(100); + + // since the transition is observed asynchronously, the count could be off by one loop iteration + assertTrue(count.get() - priorCount <= 1); + + workerTask.stop(); + assertTrue(workerTask.awaitStop(1000)); + + taskFuture.get(); + + PowerMock.verifyAll(); + } + + @Test + public void testPollsInBackground() throws Exception { + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(TASK_PROPS); + EasyMock.expectLastCall(); + statusListener.onStartup(taskId); + EasyMock.expectLastCall(); + + final CountDownLatch pollLatch = expectPolls(10); + // In this test, we don't flush, so nothing goes any further than the offset writer + + expectTopicCreation(TOPIC); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + statusListener.onShutdown(taskId); + EasyMock.expectLastCall(); + + expectClose(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future<?> taskFuture = executor.submit(workerTask); + + assertTrue(awaitLatch(pollLatch)); + workerTask.stop(); + assertTrue(workerTask.awaitStop(1000)); + + taskFuture.get(); + assertPollMetrics(10); + + PowerMock.verifyAll(); + } + + @Test + public void testFailureInPoll() throws Exception { + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(TASK_PROPS); + EasyMock.expectLastCall(); + statusListener.onStartup(taskId); + EasyMock.expectLastCall(); + + final CountDownLatch pollLatch = new CountDownLatch(1); + final RuntimeException exception = new RuntimeException(); + EasyMock.expect(sourceTask.poll()).andAnswer(new IAnswer<List<SourceRecord>>() { + @Override + public List<SourceRecord> answer() throws Throwable { + pollLatch.countDown(); + throw exception; + } + }); + + statusListener.onFailure(taskId, exception); + EasyMock.expectLastCall(); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + expectClose(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future<?> taskFuture = executor.submit(workerTask); + + assertTrue(awaitLatch(pollLatch)); + workerTask.stop(); + assertTrue(workerTask.awaitStop(1000)); + + taskFuture.get(); + assertPollMetrics(0); + + PowerMock.verifyAll(); + } + + @Test + public void testPollReturnsNoRecords() throws Exception { + // Test that the task handles an empty list of records + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(TASK_PROPS); + EasyMock.expectLastCall(); + statusListener.onStartup(taskId); + EasyMock.expectLastCall(); + + // We'll wait for some data, then trigger a flush + final CountDownLatch pollLatch = expectEmptyPolls(1, new AtomicInteger()); + expectOffsetFlush(true); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + statusListener.onShutdown(taskId); + EasyMock.expectLastCall(); + + expectClose(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future<?> taskFuture = executor.submit(workerTask); + + assertTrue(awaitLatch(pollLatch)); + assertTrue(workerTask.commitOffsets()); + workerTask.stop(); + assertTrue(workerTask.awaitStop(1000)); + + taskFuture.get(); + assertPollMetrics(0); + + PowerMock.verifyAll(); + } + + @Test + public void testCommit() throws Exception { + // Test that the task commits properly when prompted + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(TASK_PROPS); + EasyMock.expectLastCall(); + statusListener.onStartup(taskId); + EasyMock.expectLastCall(); + + // We'll wait for some data, then trigger a flush + final CountDownLatch pollLatch = expectPolls(1); + expectOffsetFlush(true); + + expectTopicCreation(TOPIC); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + statusListener.onShutdown(taskId); + EasyMock.expectLastCall(); + + expectClose(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future<?> taskFuture = executor.submit(workerTask); + + assertTrue(awaitLatch(pollLatch)); + assertTrue(workerTask.commitOffsets()); + workerTask.stop(); + assertTrue(workerTask.awaitStop(1000)); + + taskFuture.get(); + assertPollMetrics(1); + + PowerMock.verifyAll(); + } + + @Test + public void testCommitFailure() throws Exception { + // Test that the task commits properly when prompted + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(TASK_PROPS); + EasyMock.expectLastCall(); + statusListener.onStartup(taskId); + EasyMock.expectLastCall(); + + // We'll wait for some data, then trigger a flush + final CountDownLatch pollLatch = expectPolls(1); + expectOffsetFlush(true); + + expectTopicCreation(TOPIC); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(false); + + statusListener.onShutdown(taskId); + EasyMock.expectLastCall(); + + expectClose(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future<?> taskFuture = executor.submit(workerTask); + + assertTrue(awaitLatch(pollLatch)); + assertTrue(workerTask.commitOffsets()); + workerTask.stop(); + assertTrue(workerTask.awaitStop(1000)); + + taskFuture.get(); + assertPollMetrics(1); + + PowerMock.verifyAll(); + } + + @Test + public void testSendRecordsConvertsData() throws Exception { + createWorkerTask(); + + List<SourceRecord> records = new ArrayList<>(); + // Can just use the same record for key and value + records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)); + + Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); + + expectTopicCreation(TOPIC); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", records); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(SERIALIZED_KEY, sent.getValue().key()); + assertEquals(SERIALIZED_RECORD, sent.getValue().value()); + + PowerMock.verifyAll(); + } + + @Test + public void testSendRecordsPropagatesTimestamp() throws Exception { + final Long timestamp = System.currentTimeMillis(); + + createWorkerTask(); + + List<SourceRecord> records = Collections.singletonList( + new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) + ); + + Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); + + expectTopicCreation(TOPIC); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", records); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(timestamp, sent.getValue().timestamp()); + + PowerMock.verifyAll(); + } + + @Test(expected = InvalidRecordException.class) + public void testSendRecordsCorruptTimestamp() throws Exception { + final Long timestamp = -3L; + createWorkerTask(); + + List<SourceRecord> records = Collections.singletonList( + new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) + ); + + Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", records); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(null, sent.getValue().timestamp()); + + PowerMock.verifyAll(); + } + + @Test + public void testSendRecordsNoTimestamp() throws Exception { + final Long timestamp = -1L; + createWorkerTask(); + + List<SourceRecord> records = Collections.singletonList( + new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) + ); + + Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); + + expectTopicCreation(TOPIC); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", records); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(null, sent.getValue().timestamp()); + + PowerMock.verifyAll(); + } + + @Test + public void testSendRecordsRetries() throws Exception { + createWorkerTask(); + + // Differentiate only by Kafka partition so we can reuse conversion expectations + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectTopicCreation(TOPIC); + + // First round + expectSendRecordOnce(false); + // Any Producer retriable exception should work here + expectSendRecordSyncFailure(new org.apache.kafka.common.errors.TimeoutException("retriable sync failure")); + + // Second round + expectSendRecordOnce(true); + expectSendRecordOnce(false); + + PowerMock.replayAll(); + + // Try to send 3, make first pass, second fail. Should save last two + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertEquals(Arrays.asList(record2, record3), Whitebox.getInternalState(workerTask, "toSend")); + + // Next they all succeed + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertNull(Whitebox.getInternalState(workerTask, "toSend")); + + PowerMock.verifyAll(); + } + + @Test(expected = ConnectException.class) + public void testSendRecordsProducerCallbackFail() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectTopicCreation(TOPIC); + + expectSendRecordProducerCallbackFail(); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + } + + @Test(expected = ConnectException.class) + public void testSendRecordsProducerSendFailsImmediately() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectPreliminaryCalls(); + expectTopicCreation(TOPIC); + + EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())) + .andThrow(new KafkaException("Producer closed while send in progress", new InvalidTopicException(TOPIC))); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + } + + @Test + public void testSendRecordsTaskCommitRecordFail() throws Exception { + createWorkerTask(); + + // Differentiate only by Kafka partition so we can reuse conversion expectations + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectTopicCreation(TOPIC); + + // Source task commit record failure will not cause the task to abort + expectSendRecordOnce(false); + expectSendRecordTaskCommitRecordFail(false, false); + expectSendRecordOnce(false); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertNull(Whitebox.getInternalState(workerTask, "toSend")); + + PowerMock.verifyAll(); + } + + @Test + public void testSlowTaskStart() throws Exception { + final CountDownLatch startupLatch = new CountDownLatch(1); + final CountDownLatch finishStartupLatch = new CountDownLatch(1); + + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(TASK_PROPS); + EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + startupLatch.countDown(); + assertTrue(awaitLatch(finishStartupLatch)); + return null; + } + }); + + statusListener.onStartup(taskId); + EasyMock.expectLastCall(); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + statusListener.onShutdown(taskId); + EasyMock.expectLastCall(); + + expectClose(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future<?> workerTaskFuture = executor.submit(workerTask); + + // Stopping immediately while the other thread has work to do should result in no polling, no offset commits, + // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it + // cannot be invoked immediately in the thread trying to stop the task. + assertTrue(awaitLatch(startupLatch)); + workerTask.stop(); + finishStartupLatch.countDown(); + assertTrue(workerTask.awaitStop(1000)); + + workerTaskFuture.get(); + + PowerMock.verifyAll(); + } + + @Test + public void testCancel() { + createWorkerTask(); + + offsetReader.close(); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.cancel(); + + PowerMock.verifyAll(); + } + + @Test + public void testMetricsGroup() { + SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, metrics); + SourceTaskMetricsGroup group1 = new SourceTaskMetricsGroup(taskId1, metrics); + for (int i = 0; i != 10; ++i) { + group.recordPoll(100, 1000 + i * 100); + group.recordWrite(10); + } + for (int i = 0; i != 20; ++i) { + group1.recordPoll(100, 1000 + i * 100); + group1.recordWrite(10); + } + assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d); + assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d); + assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d); + assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d); + assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d); + assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d); + assertEquals(900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-active-count"), 0.001d); + + // Close the group + group.close(); + + for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) { + // Metrics for this group should no longer exist + assertFalse(group.metricGroup().groupId().includes(metricName)); + } + // Sensors for this group should no longer exist + assertNull(group.metricGroup().metrics().getSensor("sink-record-read")); + assertNull(group.metricGroup().metrics().getSensor("sink-record-send")); + assertNull(group.metricGroup().metrics().getSensor("sink-record-active-count")); + assertNull(group.metricGroup().metrics().getSensor("partition-count")); + assertNull(group.metricGroup().metrics().getSensor("offset-seq-number")); + assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion")); + assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion-skip")); + assertNull(group.metricGroup().metrics().getSensor("put-batch-time")); + + assertEquals(2900.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-max-time-ms"), 0.001d); + assertEquals(1950.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-avg-time-ms"), 0.001d); + assertEquals(66.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-rate"), 0.001d); + assertEquals(2000, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-total"), 0.001d); + assertEquals(6.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d); + assertEquals(200, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d); + assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"), 0.001d); + } + + @Test + public void testHeaders() throws Exception { + Headers headers = new RecordHeaders(); + headers.add("header_key", "header_value".getBytes()); + + org.apache.kafka.connect.header.Headers connectHeaders = new ConnectHeaders(); + connectHeaders.add("header_key", new SchemaAndValue(Schema.STRING_SCHEMA, "header_value")); + + createWorkerTask(); + + List<SourceRecord> records = new ArrayList<>(); + records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders)); + + expectTopicCreation(TOPIC); + + Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, true, false, true, true, true, headers); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", records); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(SERIALIZED_KEY, sent.getValue().key()); + assertEquals(SERIALIZED_RECORD, sent.getValue().value()); + assertEquals(headers, sent.getValue().headers()); + + PowerMock.verifyAll(); + } + + @Test + public void testHeadersWithCustomConverter() throws Exception { + StringConverter stringConverter = new StringConverter(); + TestConverterWithHeaders testConverter = new TestConverterWithHeaders(); + + createWorkerTask(TargetState.STARTED, stringConverter, testConverter, stringConverter); + + List<SourceRecord> records = new ArrayList<>(); + + String stringA = "Árvíztűrő tükörfúrógép"; + org.apache.kafka.connect.header.Headers headersA = new ConnectHeaders(); + String encodingA = "latin2"; + headersA.addString("encoding", encodingA); + + records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, stringA, null, headersA)); + + String stringB = "Тестовое сообщение"; + org.apache.kafka.connect.header.Headers headersB = new ConnectHeaders(); + String encodingB = "koi8_r"; + headersB.addString("encoding", encodingB); + + records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, stringB, null, headersB)); + + expectTopicCreation(TOPIC); + + Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(TOPIC, false, false, true, true, false, null); + Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(TOPIC, false, false, true, true, false, null); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", records); + Whitebox.invokeMethod(workerTask, "sendRecords"); + + assertEquals(ByteBuffer.wrap("a".getBytes()), ByteBuffer.wrap(sentRecordA.getValue().key())); + assertEquals( + ByteBuffer.wrap(stringA.getBytes(encodingA)), + ByteBuffer.wrap(sentRecordA.getValue().value()) + ); + assertEquals(encodingA, new String(sentRecordA.getValue().headers().lastHeader("encoding").value())); + + assertEquals(ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap(sentRecordB.getValue().key())); + assertEquals( + ByteBuffer.wrap(stringB.getBytes(encodingB)), + ByteBuffer.wrap(sentRecordB.getValue().value()) + ); + assertEquals(encodingB, new String(sentRecordB.getValue().headers().lastHeader("encoding").value())); + + PowerMock.verifyAll(); + } + + @Test + public void testTopicCreateWhenTopicExists() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectPreliminaryCalls(); + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList()); + TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); + EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC, topicDesc)); + + expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false, false); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + } + + @Test + public void testSendRecordsTopicDescribeRetries() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectPreliminaryCalls(); + // First round - call to describe the topic times out + EasyMock.expect(admin.describeTopics(TOPIC)) + .andThrow(new RetriableException(new TimeoutException("timeout"))); + + // Second round - calls to describe and create succeed + expectTopicCreation(TOPIC); + // Exactly two records are sent + expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false, false); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend")); + + // Next they all succeed + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertNull(Whitebox.getInternalState(workerTask, "toSend")); + } + + @Test + public void testSendRecordsTopicCreateRetries() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + // First call to describe the topic times out + expectPreliminaryCalls(); + EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); + Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); + EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))) + .andThrow(new RetriableException(new TimeoutException("timeout"))); + + // Second round + expectTopicCreation(TOPIC); + expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false, false); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend")); + + // Next they all succeed + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertNull(Whitebox.getInternalState(workerTask, "toSend")); + } + + @Test + public void testSendRecordsTopicDescribeRetriesMidway() throws Exception { + createWorkerTask(); + + // Differentiate only by Kafka partition so we can reuse conversion expectations + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + // First round + expectPreliminaryCalls(OTHER_TOPIC); + expectTopicCreation(TOPIC); + expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false, false); + + // First call to describe the topic times out + EasyMock.expect(admin.describeTopics(OTHER_TOPIC)) + .andThrow(new RetriableException(new TimeoutException("timeout"))); + + // Second round + expectTopicCreation(OTHER_TOPIC); + expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders()); + + PowerMock.replayAll(); + + // Try to send 3, make first pass, second fail. Should save last two + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend")); + + // Next they all succeed + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertNull(Whitebox.getInternalState(workerTask, "toSend")); + + PowerMock.verifyAll(); + } + + @Test + public void testSendRecordsTopicCreateRetriesMidway() throws Exception { + createWorkerTask(); + + // Differentiate only by Kafka partition so we can reuse conversion expectations + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + // First round + expectPreliminaryCalls(OTHER_TOPIC); + expectTopicCreation(TOPIC); + expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false, false); + + EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap()); + // First call to create the topic times out + Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); + EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))) + .andThrow(new RetriableException(new TimeoutException("timeout"))); + + // Second round + expectTopicCreation(OTHER_TOPIC); + expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders()); + + PowerMock.replayAll(); + + // Try to send 3, make first pass, second fail. Should save last two + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend")); + + // Next they all succeed + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertNull(Whitebox.getInternalState(workerTask, "toSend")); + + PowerMock.verifyAll(); + } + + @Test(expected = ConnectException.class) + public void testTopicDescribeFails() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectPreliminaryCalls(); + EasyMock.expect(admin.describeTopics(TOPIC)) + .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized"))); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + } + + @Test(expected = ConnectException.class) + public void testTopicCreateFails() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectPreliminaryCalls(); + EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); + + Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); + EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))) + .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized"))); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertNotNull(newTopicCapture.getValue()); + } + + @Test(expected = ConnectException.class) + public void testTopicCreateFailsWithExceptionWhenCreateReturnsFalse() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectPreliminaryCalls(); + EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); + + Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); + EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(false); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertNotNull(newTopicCapture.getValue()); + } + + @Test + public void testTopicCreationClassWhenTopicCreationIsEnabled() { + TopicCreationGroup expectedDefaultGroup = + TopicCreationGroup.configuredGroups(sourceConfig).get(DEFAULT_TOPIC_CREATION_GROUP); + + TopicCreation topicCreation = TopicCreation.newTopicCreation(config, + TopicCreationGroup.configuredGroups(sourceConfig)); + + assertTrue(topicCreation.isTopicCreationEnabled()); + assertTrue(topicCreation.isTopicCreationRequired(TOPIC)); + assertThat(topicCreation.defaultTopicGroup(), is(expectedDefaultGroup)); + assertEquals(2, topicCreation.topicGroups().size()); + assertThat(topicCreation.topicGroups().keySet(), hasItems("foo", "bar")); + topicCreation.addTopic(TOPIC); + assertFalse(topicCreation.isTopicCreationRequired(TOPIC)); + } + + @Test + public void testTopicCreationClassWhenTopicCreationIsDisabled() { + Map<String, String> workerProps = workerProps(); + workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(false)); + config = new StandaloneConfig(workerProps); + + TopicCreation topicCreation = TopicCreation.newTopicCreation(config, + TopicCreationGroup.configuredGroups(sourceConfig)); + + assertFalse(topicCreation.isTopicCreationEnabled()); + assertFalse(topicCreation.isTopicCreationRequired(TOPIC)); + assertNull(topicCreation.defaultTopicGroup()); + assertEquals(0, topicCreation.topicGroups().size()); + assertThat(topicCreation.topicGroups(), is(Collections.emptyMap())); + topicCreation.addTopic(TOPIC); + assertFalse(topicCreation.isTopicCreationRequired(TOPIC)); + } + + @Test + public void testEmptyTopicCreationClass() { + TopicCreation topicCreation = TopicCreation.newTopicCreation(config, null); + + assertFalse(topicCreation.isTopicCreationEnabled()); + assertFalse(topicCreation.isTopicCreationRequired(TOPIC)); + assertNull(topicCreation.defaultTopicGroup()); + assertEquals(0, topicCreation.topicGroups().size()); + assertThat(topicCreation.topicGroups(), is(Collections.emptyMap())); + topicCreation.addTopic(TOPIC); + assertFalse(topicCreation.isTopicCreationRequired(TOPIC)); + } Review comment: This test class is already fairly complex. Can these move to a new `TopicCreationTest` class to correspond to the new `TopicCreation` class? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -166,11 +176,14 @@ protected void close() { log.warn("Could not close producer", t); } } - try { - transformationChain.close(); - } catch (Throwable t) { - log.warn("Could not close transformation chain", t); + if (admin != null) { + try { + admin.close(Duration.ofSeconds(30)); + } catch (Throwable t) { + log.warn("Failed to close admin client on time", t); + } Review comment: Should this have a `finally` block that nulls the `admin` field? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java ########## @@ -16,21 +16,167 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.runtime.isolation.Plugins; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; public class SourceConnectorConfig extends ConnectorConfig { - private static ConfigDef config = ConnectorConfig.configDef(); + protected static final String TOPIC_CREATION_GROUP = "Topic Creation"; + + public static final String TOPIC_CREATION_PREFIX = "topic.creation."; + + public static final String TOPIC_CREATION_GROUPS_CONFIG = TOPIC_CREATION_PREFIX + "groups"; + private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of configurations for topics " + + "created by source connectors"; + private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups"; + + private static class EnrichedSourceConnectorConfig extends AbstractConfig { + EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> props) { + super(configDef, props); + } + + @Override + public Object get(String key) { + return super.get(key); + } + } + + private static ConfigDef config = SourceConnectorConfig.configDef(); + private final EnrichedSourceConnectorConfig enrichedSourceConfig; public static ConfigDef configDef() { - return config; + int orderInGroup = 0; + return new ConfigDef(ConnectorConfig.configDef()) + .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), + ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with( + (name, value) -> { + List<?> groupAliases = (List<?>) value; + if (groupAliases.size() > new HashSet<>(groupAliases).size()) { + throw new ConfigException(name, value, "Duplicate alias provided."); + } + }, + () -> "unique topic creation groups")), + ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP, + ++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY); } - public SourceConnectorConfig(Plugins plugins, Map<String, String> props) { + public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) { + String defaultGroup = "default"; + ConfigDef newDefaultDef = new ConfigDef(baseConfigDef); + newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, TopicCreationConfig.defaultGroupConfigDef()); + return newDefaultDef; + } + + /** + * Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input. + * + * @param baseConfigDef the base configuration definition to be enriched + * @param props the non parsed configuration properties + * @return the enriched configuration definition + */ + public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, AbstractConfig defaultGroupConfig) { + List<Object> topicCreationGroups = new ArrayList<>(); + Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST); + if (aliases instanceof List) { + topicCreationGroups.addAll((List<?>) aliases); + } + + ConfigDef newDef = new ConfigDef(baseConfigDef); + String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + "."; + short defaultGroupReplicationFactor = defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG); + int defaultGroupPartitions = defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG); + topicCreationGroups.stream().distinct().forEach(group -> { + if (!(group instanceof String)) { + throw new ConfigException("Item in " + TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String"); + } + String alias = (String) group; + String prefix = TOPIC_CREATION_PREFIX + alias + "."; + String configGroup = TOPIC_CREATION_GROUP + ": " + alias; + newDef.embed(prefix, configGroup, 0, + TopicCreationConfig.configDef(configGroup, defaultGroupReplicationFactor, defaultGroupPartitions)); + }); + return newDef; + } + + public SourceConnectorConfig(Plugins plugins, Map<String, String> props, boolean createTopics) { super(plugins, config, props); + if (createTopics && props.entrySet().stream().anyMatch(e -> e.getKey().startsWith(TOPIC_CREATION_PREFIX))) { + ConfigDef defaultConfigDef = embedDefaultGroup(config); + // This config is only used to set default values for partitions and replication + // factor from the default group and otherwise it remains unused + AbstractConfig defaultGroup = new AbstractConfig(defaultConfigDef, props, false); + + // If the user has added regex of include or exclude patterns in the default group, + // they should be ignored. Review comment: Nice catch. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.util.TopicAdmin; + +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class TopicCreationConfig { + + public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default."; + public static final String DEFAULT_TOPIC_CREATION_GROUP = "default"; + + public static final String INCLUDE_REGEX_CONFIG = "include"; + private static final String INCLUDE_REGEX_DOC = "A list of regular expression literals " + + "used to match the names topics used by the source connector. This list is used " + + "to include topics that should be created using the topic settings defined by this group."; + + public static final String EXCLUDE_REGEX_CONFIG = "exclude"; + private static final String INCLUDE_REGEX_DOC = "A list of regular expression literals " Review comment: This is misnamed, and that was my fault when I made a wrong suggestion earlier. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org