C0urante commented on code in PR #14303: URL: https://github.com/apache/kafka/pull/14303#discussion_r1309049237
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ########## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.transforms.Filter; +import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_REGEX_CONFIG; +import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG; + +/** + * Integration test for preflight connector config validation + */ +@Category(IntegrationTest.class) +public class ConnectorValidationIntegrationTest { + + private static final String WORKER_GROUP_ID = "connect-worker-group-id"; + + // Use a single embedded cluster for all test cases in order to cut down on runtime + private static EmbeddedConnectCluster connect; + + @BeforeClass + public static void setup() { + Map<String, String> workerProps = new HashMap<>(); + workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID); + + // build a Connect cluster backed by Kafka and Zk + connect = new EmbeddedConnectCluster.Builder() + .name("connector-validation-connect-cluster") + .workerProps(workerProps) + .build(); + connect.start(); + } + + @AfterClass + public static void close() { + if (connect != null) { + // stop all Connect, Kafka and Zk threads. + Utils.closeQuietly(connect::stop, "Embedded Connect cluster"); + } + } + + @Test + public void testSinkConnectorHasNeitherTopicsListNorTopicsRegex() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.remove(TOPICS_CONFIG); + config.remove(TOPICS_REGEX_CONFIG); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 2, // One error each for topics list and topics regex + "Sink connector config should fail preflight validation when neither topics list nor topics regex are provided", + 0 + ); + } + + @Test + public void testSinkConnectorHasBothTopicsListAndTopicsRegex() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(TOPICS_CONFIG, "t1"); + config.put(TOPICS_REGEX_CONFIG, "r.*"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 2, // One error each for topics list and topics regex + "Sink connector config should fail preflight validation when both topics list and topics regex are provided", + 0 + ); + } + + @Test + public void testSinkConnectorDeadLetterQueueTopicInTopicsList() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(TOPICS_CONFIG, "t1"); + config.put(DLQ_TOPIC_NAME_CONFIG, "t1"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Sink connector config should fail preflight validation when DLQ topic is included in topics list", + 0 + ); + } + + @Test + public void testSinkConnectorDeadLetterQueueTopicMatchesTopicsRegex() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(TOPICS_REGEX_CONFIG, "r.*"); + config.put(DLQ_TOPIC_NAME_CONFIG, "ruh.roh"); + config.remove(TOPICS_CONFIG); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Sink connector config should fail preflight validation when DLQ topic matches topics regex", + 0 + ); + } + + @Test + public void testSinkConnectorDefaultGroupIdConflictsWithWorkerGroupId() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + // Combined with the logic in SinkUtils::consumerGroupId, this should conflict with the worker group ID + config.put(NAME_CONFIG, "worker-group-id"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Sink connector config should fail preflight validation when default consumer group ID conflicts with Connect worker group ID", + 0 + ); + } + + @Test + public void testSinkConnectorOverriddenGroupIdConflictsWithWorkerGroupId() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + GROUP_ID_CONFIG, WORKER_GROUP_ID); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Sink connector config should fail preflight validation when overridden consumer group ID conflicts with Connect worker group ID", + 0 + ); + } + + @Test + public void testSourceConnectorHasDuplicateTopicCreationGroups() throws InterruptedException { + Map<String, String> config = defaultSourceConnectorProps(); + config.put(TOPIC_CREATION_GROUPS_CONFIG, "g1, g2, g1"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Source connector config should fail preflight validation when the same topic creation group is specified multiple times", + 0 + ); + } + + @Test + public void testConnectorHasDuplicateTransformations() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String transformName = "t"; + config.put(TRANSFORMS_CONFIG, transformName + ", " + transformName); + config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", Filter.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when the same transformation is specified multiple times", + 0 + ); + } + + @Test + public void testConnectorHasMissingTransformClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String transformName = "t"; + config.put(TRANSFORMS_CONFIG, transformName); + config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", "WheresTheFruit"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a transformation with a class not found on the worker is specified", + 0 + ); + } + + @Test + public void testConnectorHasInvalidTransformClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String transformName = "t"; + config.put(TRANSFORMS_CONFIG, transformName); + config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", MonitorableSinkConnector.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a transformation with a class of the wrong type is specified", + 0 + ); + } + + @Test + public void testConnectorHasNegationForUndefinedPredicate() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String transformName = "t"; + config.put(TRANSFORMS_CONFIG, transformName); + config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", Filter.class.getName()); + config.put(TRANSFORMS_CONFIG + "." + transformName + ".negate", "true"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when an undefined predicate is negated", + 0 + ); + } + + @Test + public void testConnectorHasDuplicatePredicates() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String predicateName = "p"; + config.put(PREDICATES_CONFIG, predicateName + ", " + predicateName); + config.put(PREDICATES_CONFIG + "." + predicateName + ".type", RecordIsTombstone.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when the same predicate is specified multiple times", + 0 + ); + } + + @Test + public void testConnectorHasMissingPredicateClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String predicateName = "p"; + config.put(PREDICATES_CONFIG, predicateName); + config.put(PREDICATES_CONFIG + "." + predicateName + ".type", "WheresTheFruit"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a predicate with a class not found on the worker is specified", + 0 + ); + } + + @Test + public void testConnectorHasInvalidPredicateClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String predicateName = "p"; + config.put(PREDICATES_CONFIG, predicateName); + config.put(PREDICATES_CONFIG + "." + predicateName + ".type", MonitorableSinkConnector.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a predicate with a class of the wrong type is specified", + 0 + ); + } + + @Test + public void testConnectorHasMissingConverterClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(KEY_CONVERTER_CLASS_CONFIG, "WheresTheFruit"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a converter with a class not found on the worker is specified", + 0 + ); + } + + @Test + public void testConnectorHasMissingHeaderConverterClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(HEADER_CONVERTER_CLASS_CONFIG, "WheresTheFruit"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a header converter with a class not found on the worker is specified", + 0 + ); + } + + private Map<String, String> defaultSourceConnectorProps() { + // setup up props for the source connector + Map<String, String> props = new HashMap<>(); + props.put(NAME_CONFIG, "source-connector"); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName()); + props.put(TASKS_MAX_CONFIG, "1"); + props.put(TOPIC_CONFIG, "t1"); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + return props; + } + + private Map<String, String> defaultSinkConnectorProps() { + // setup up props for the sink connector + Map<String, String> props = new HashMap<>(); + props.put(NAME_CONFIG, "sink-connector"); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName()); + props.put(TASKS_MAX_CONFIG, "1"); + props.put(TOPICS_CONFIG, "t1"); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + return props; + } + +} Review Comment: Ack, done 👍 ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ########## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.transforms.Filter; +import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_REGEX_CONFIG; +import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG; + +/** + * Integration test for preflight connector config validation + */ +@Category(IntegrationTest.class) +public class ConnectorValidationIntegrationTest { + + private static final String WORKER_GROUP_ID = "connect-worker-group-id"; + + // Use a single embedded cluster for all test cases in order to cut down on runtime + private static EmbeddedConnectCluster connect; + + @BeforeClass + public static void setup() { + Map<String, String> workerProps = new HashMap<>(); + workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID); + + // build a Connect cluster backed by Kafka and Zk + connect = new EmbeddedConnectCluster.Builder() + .name("connector-validation-connect-cluster") + .workerProps(workerProps) + .build(); + connect.start(); + } + + @AfterClass + public static void close() { + if (connect != null) { + // stop all Connect, Kafka and Zk threads. + Utils.closeQuietly(connect::stop, "Embedded Connect cluster"); + } + } + + @Test + public void testSinkConnectorHasNeitherTopicsListNorTopicsRegex() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.remove(TOPICS_CONFIG); + config.remove(TOPICS_REGEX_CONFIG); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 2, // One error each for topics list and topics regex + "Sink connector config should fail preflight validation when neither topics list nor topics regex are provided", + 0 + ); + } + + @Test + public void testSinkConnectorHasBothTopicsListAndTopicsRegex() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(TOPICS_CONFIG, "t1"); + config.put(TOPICS_REGEX_CONFIG, "r.*"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 2, // One error each for topics list and topics regex + "Sink connector config should fail preflight validation when both topics list and topics regex are provided", + 0 + ); + } + + @Test + public void testSinkConnectorDeadLetterQueueTopicInTopicsList() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(TOPICS_CONFIG, "t1"); + config.put(DLQ_TOPIC_NAME_CONFIG, "t1"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Sink connector config should fail preflight validation when DLQ topic is included in topics list", + 0 + ); + } + + @Test + public void testSinkConnectorDeadLetterQueueTopicMatchesTopicsRegex() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(TOPICS_REGEX_CONFIG, "r.*"); + config.put(DLQ_TOPIC_NAME_CONFIG, "ruh.roh"); + config.remove(TOPICS_CONFIG); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Sink connector config should fail preflight validation when DLQ topic matches topics regex", + 0 + ); + } + + @Test + public void testSinkConnectorDefaultGroupIdConflictsWithWorkerGroupId() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + // Combined with the logic in SinkUtils::consumerGroupId, this should conflict with the worker group ID + config.put(NAME_CONFIG, "worker-group-id"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Sink connector config should fail preflight validation when default consumer group ID conflicts with Connect worker group ID", + 0 + ); + } + + @Test + public void testSinkConnectorOverriddenGroupIdConflictsWithWorkerGroupId() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + GROUP_ID_CONFIG, WORKER_GROUP_ID); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Sink connector config should fail preflight validation when overridden consumer group ID conflicts with Connect worker group ID", + 0 + ); + } + + @Test + public void testSourceConnectorHasDuplicateTopicCreationGroups() throws InterruptedException { + Map<String, String> config = defaultSourceConnectorProps(); + config.put(TOPIC_CREATION_GROUPS_CONFIG, "g1, g2, g1"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Source connector config should fail preflight validation when the same topic creation group is specified multiple times", + 0 + ); + } + + @Test + public void testConnectorHasDuplicateTransformations() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String transformName = "t"; + config.put(TRANSFORMS_CONFIG, transformName + ", " + transformName); + config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", Filter.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when the same transformation is specified multiple times", + 0 + ); + } + + @Test + public void testConnectorHasMissingTransformClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String transformName = "t"; + config.put(TRANSFORMS_CONFIG, transformName); + config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", "WheresTheFruit"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a transformation with a class not found on the worker is specified", + 0 + ); + } + + @Test + public void testConnectorHasInvalidTransformClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String transformName = "t"; + config.put(TRANSFORMS_CONFIG, transformName); + config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", MonitorableSinkConnector.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a transformation with a class of the wrong type is specified", + 0 + ); + } + + @Test + public void testConnectorHasNegationForUndefinedPredicate() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String transformName = "t"; + config.put(TRANSFORMS_CONFIG, transformName); + config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", Filter.class.getName()); + config.put(TRANSFORMS_CONFIG + "." + transformName + ".negate", "true"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when an undefined predicate is negated", + 0 + ); + } + + @Test + public void testConnectorHasDuplicatePredicates() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String predicateName = "p"; + config.put(PREDICATES_CONFIG, predicateName + ", " + predicateName); + config.put(PREDICATES_CONFIG + "." + predicateName + ".type", RecordIsTombstone.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when the same predicate is specified multiple times", + 0 + ); + } + + @Test + public void testConnectorHasMissingPredicateClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String predicateName = "p"; + config.put(PREDICATES_CONFIG, predicateName); + config.put(PREDICATES_CONFIG + "." + predicateName + ".type", "WheresTheFruit"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a predicate with a class not found on the worker is specified", + 0 + ); + } + + @Test + public void testConnectorHasInvalidPredicateClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + String predicateName = "p"; + config.put(PREDICATES_CONFIG, predicateName); + config.put(PREDICATES_CONFIG + "." + predicateName + ".type", MonitorableSinkConnector.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a predicate with a class of the wrong type is specified", + 0 + ); + } + + @Test + public void testConnectorHasMissingConverterClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(KEY_CONVERTER_CLASS_CONFIG, "WheresTheFruit"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a converter with a class not found on the worker is specified", + 0 + ); + } + + @Test + public void testConnectorHasMissingHeaderConverterClass() throws InterruptedException { + Map<String, String> config = defaultSinkConnectorProps(); + config.put(HEADER_CONVERTER_CLASS_CONFIG, "WheresTheFruit"); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a header converter with a class not found on the worker is specified", + 0 + ); + } + + private Map<String, String> defaultSourceConnectorProps() { + // setup up props for the source connector + Map<String, String> props = new HashMap<>(); + props.put(NAME_CONFIG, "source-connector"); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName()); + props.put(TASKS_MAX_CONFIG, "1"); + props.put(TOPIC_CONFIG, "t1"); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + return props; + } + + private Map<String, String> defaultSinkConnectorProps() { + // setup up props for the sink connector + Map<String, String> props = new HashMap<>(); + props.put(NAME_CONFIG, "sink-connector"); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName()); + props.put(TASKS_MAX_CONFIG, "1"); + props.put(TOPICS_CONFIG, "t1"); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + return props; + } + +} Review Comment: Ack, done 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java: ########## @@ -90,50 +93,83 @@ public SinkConnectorConfig(Plugins plugins, Map<String, String> props) { * @param props sink configuration properties */ public static void validate(Map<String, String> props) { - final boolean hasTopicsConfig = hasTopicsConfig(props); - final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props); - final boolean hasDlqTopicConfig = hasDlqTopicConfig(props); + Map<String, ConfigValue> validatedConfig = new LinkedHashMap<>(); + validate(props, validatedConfig); + validatedConfig.values().stream() + .filter(configValue -> !configValue.errorMessages().isEmpty()) + .findFirst() + .ifPresent(configValue -> { + throw new ConfigException(configValue.name(), configValue.value(), configValue.errorMessages().get(0)); + }); + } + + /** + * Perform preflight validation for the sink-specific properties for a connector. + * + * @param props the configuration for the sink connector + * @param validatedConfig any already-known {@link ConfigValue validation results} for the configuration. + * May be empty, but may not be null. Any configuration errors discovered by this method will + * be {@link ConfigValue#addErrorMessage(String) added} to a value in this map, adding a new + * entry if one for the problematic property does not already exist. + */ + public static void validate(Map<String, String> props, Map<String, ConfigValue> validatedConfig) { + final String topicsList = props.get(TOPICS_CONFIG); + final String topicsRegex = props.get(TOPICS_REGEX_CONFIG); + final String dlqTopic = props.getOrDefault(DLQ_TOPIC_NAME_CONFIG, "").trim(); + final boolean hasTopicsConfig = !Utils.isBlank(topicsList); + final boolean hasTopicsRegexConfig = !Utils.isBlank(topicsRegex); + final boolean hasDlqTopicConfig = !Utils.isBlank(dlqTopic); if (hasTopicsConfig && hasTopicsRegexConfig) { - throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG + - " are mutually exclusive options, but both are set."); + String errorMessage = TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG + " are mutually exclusive options, but both are set."; + addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, errorMessage); + addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, errorMessage); } if (!hasTopicsConfig && !hasTopicsRegexConfig) { - throw new ConfigException("Must configure one of " + - SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG); + String errorMessage = "Must configure one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG; + addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, errorMessage); + addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, errorMessage); } if (hasDlqTopicConfig) { - String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim(); if (hasTopicsConfig) { List<String> topics = parseTopicsList(props); if (topics.contains(dlqTopic)) { - throw new ConfigException(String.format("The DLQ topic '%s' may not be included in the list of " - + "topics ('%s=%s') consumed by the connector", dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topics)); + String errorMessage = String.format( + "The DLQ topic '%s' may not be included in the list of topics ('%s=%s') consumed by the connector", + dlqTopic, TOPICS_CONFIG, topics + ); + addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, errorMessage); } } if (hasTopicsRegexConfig) { - String topicsRegexStr = props.get(SinkTask.TOPICS_REGEX_CONFIG); - Pattern pattern = Pattern.compile(topicsRegexStr); + Pattern pattern = Pattern.compile(topicsRegex); if (pattern.matcher(dlqTopic).matches()) { - throw new ConfigException(String.format("The DLQ topic '%s' may not be included in the regex matching the " - + "topics ('%s=%s') consumed by the connector", dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topicsRegexStr)); + String errorMessage = String.format( + "The DLQ topic '%s' may not be included in the regex matching the topics ('%s=%s') consumed by the connector", Review Comment: This is the existing language on trunk, but I agree that "match" is clearer than "include". Done 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java: ########## @@ -90,50 +93,83 @@ public SinkConnectorConfig(Plugins plugins, Map<String, String> props) { * @param props sink configuration properties */ public static void validate(Map<String, String> props) { - final boolean hasTopicsConfig = hasTopicsConfig(props); - final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props); - final boolean hasDlqTopicConfig = hasDlqTopicConfig(props); + Map<String, ConfigValue> validatedConfig = new LinkedHashMap<>(); + validate(props, validatedConfig); + validatedConfig.values().stream() + .filter(configValue -> !configValue.errorMessages().isEmpty()) + .findFirst() + .ifPresent(configValue -> { + throw new ConfigException(configValue.name(), configValue.value(), configValue.errorMessages().get(0)); + }); + } + + /** + * Perform preflight validation for the sink-specific properties for a connector. + * + * @param props the configuration for the sink connector + * @param validatedConfig any already-known {@link ConfigValue validation results} for the configuration. + * May be empty, but may not be null. Any configuration errors discovered by this method will + * be {@link ConfigValue#addErrorMessage(String) added} to a value in this map, adding a new + * entry if one for the problematic property does not already exist. + */ + public static void validate(Map<String, String> props, Map<String, ConfigValue> validatedConfig) { + final String topicsList = props.get(TOPICS_CONFIG); + final String topicsRegex = props.get(TOPICS_REGEX_CONFIG); + final String dlqTopic = props.getOrDefault(DLQ_TOPIC_NAME_CONFIG, "").trim(); + final boolean hasTopicsConfig = !Utils.isBlank(topicsList); + final boolean hasTopicsRegexConfig = !Utils.isBlank(topicsRegex); + final boolean hasDlqTopicConfig = !Utils.isBlank(dlqTopic); if (hasTopicsConfig && hasTopicsRegexConfig) { - throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG + - " are mutually exclusive options, but both are set."); + String errorMessage = TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG + " are mutually exclusive options, but both are set."; + addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, errorMessage); + addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, errorMessage); } if (!hasTopicsConfig && !hasTopicsRegexConfig) { - throw new ConfigException("Must configure one of " + - SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG); + String errorMessage = "Must configure one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG; + addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, errorMessage); + addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, errorMessage); } if (hasDlqTopicConfig) { - String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim(); if (hasTopicsConfig) { List<String> topics = parseTopicsList(props); if (topics.contains(dlqTopic)) { - throw new ConfigException(String.format("The DLQ topic '%s' may not be included in the list of " - + "topics ('%s=%s') consumed by the connector", dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topics)); + String errorMessage = String.format( + "The DLQ topic '%s' may not be included in the list of topics ('%s=%s') consumed by the connector", + dlqTopic, TOPICS_CONFIG, topics + ); + addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, errorMessage); } } if (hasTopicsRegexConfig) { - String topicsRegexStr = props.get(SinkTask.TOPICS_REGEX_CONFIG); - Pattern pattern = Pattern.compile(topicsRegexStr); + Pattern pattern = Pattern.compile(topicsRegex); if (pattern.matcher(dlqTopic).matches()) { - throw new ConfigException(String.format("The DLQ topic '%s' may not be included in the regex matching the " - + "topics ('%s=%s') consumed by the connector", dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topicsRegexStr)); + String errorMessage = String.format( + "The DLQ topic '%s' may not be included in the regex matching the topics ('%s=%s') consumed by the connector", Review Comment: This is the existing language on trunk, but I agree that "match" is clearer than "include". Done 👍 ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java: ########## @@ -239,7 +239,23 @@ protected Optional<Boolean> checkTopicSettings(String topicName, int replicas, i * @param numErrors the number of errors expected */ public void assertExactlyNumErrorsOnConnectorConfigValidation(String connectorClass, Map<String, String> connConfig, - int numErrors, String detailMessage) throws InterruptedException { + int numErrors, String detailMessage) throws InterruptedException { + assertExactlyNumErrorsOnConnectorConfigValidation(connectorClass, connConfig, numErrors, detailMessage, VALIDATION_DURATION_MS); + } + + /** + * Assert that the required number of errors are produced by a connector config validation. + * + * @param connectorClass the class of the connector to validate + * @param connConfig the intended configuration + * @param numErrors the number of errors expected + * @param timeout how long to retry for before throwing an exception Review Comment: Good catch! TBF it's also missing [on trunk](https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java#L234-L240), but it doesn't hurt to add. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org