C0urante commented on code in PR #14303:
URL: https://github.com/apache/kafka/pull/14303#discussion_r1309049237


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+
+/**
+ * Integration test for preflight connector config validation
+ */
+@Category(IntegrationTest.class)
+public class ConnectorValidationIntegrationTest {
+
+    private static final String WORKER_GROUP_ID = "connect-worker-group-id";
+
+    // Use a single embedded cluster for all test cases in order to cut down 
on runtime
+    private static EmbeddedConnectCluster connect;
+
+    @BeforeClass
+    public static void setup() {
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("connector-validation-connect-cluster")
+                .workerProps(workerProps)
+                .build();
+        connect.start();
+    }
+
+    @AfterClass
+    public static void close() {
+        if (connect != null) {
+            // stop all Connect, Kafka and Zk threads.
+            Utils.closeQuietly(connect::stop, "Embedded Connect cluster");
+        }
+    }
+
+    @Test
+    public void testSinkConnectorHasNeitherTopicsListNorTopicsRegex() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.remove(TOPICS_CONFIG);
+        config.remove(TOPICS_REGEX_CONFIG);
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                2, // One error each for topics list and topics regex
+                "Sink connector config should fail preflight validation when 
neither topics list nor topics regex are provided",
+                0
+        );
+    }
+
+    @Test
+    public void testSinkConnectorHasBothTopicsListAndTopicsRegex() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(TOPICS_CONFIG, "t1");
+        config.put(TOPICS_REGEX_CONFIG, "r.*");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                2, // One error each for topics list and topics regex
+                "Sink connector config should fail preflight validation when 
both topics list and topics regex are provided",
+                0
+        );
+    }
+
+    @Test
+    public void testSinkConnectorDeadLetterQueueTopicInTopicsList() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(TOPICS_CONFIG, "t1");
+        config.put(DLQ_TOPIC_NAME_CONFIG, "t1");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Sink connector config should fail preflight validation when 
DLQ topic is included in topics list",
+                0
+        );
+    }
+
+    @Test
+    public void testSinkConnectorDeadLetterQueueTopicMatchesTopicsRegex() 
throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(TOPICS_REGEX_CONFIG, "r.*");
+        config.put(DLQ_TOPIC_NAME_CONFIG, "ruh.roh");
+        config.remove(TOPICS_CONFIG);
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Sink connector config should fail preflight validation when 
DLQ topic matches topics regex",
+                0
+        );
+    }
+
+    @Test
+    public void testSinkConnectorDefaultGroupIdConflictsWithWorkerGroupId() 
throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        // Combined with the logic in SinkUtils::consumerGroupId, this should 
conflict with the worker group ID
+        config.put(NAME_CONFIG, "worker-group-id");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Sink connector config should fail preflight validation when 
default consumer group ID conflicts with Connect worker group ID",
+                0
+        );
+    }
+
+    @Test
+    public void testSinkConnectorOverriddenGroupIdConflictsWithWorkerGroupId() 
throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
GROUP_ID_CONFIG, WORKER_GROUP_ID);
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Sink connector config should fail preflight validation when 
overridden consumer group ID conflicts with Connect worker group ID",
+                0
+        );
+    }
+
+    @Test
+    public void testSourceConnectorHasDuplicateTopicCreationGroups() throws 
InterruptedException {
+        Map<String, String> config = defaultSourceConnectorProps();
+        config.put(TOPIC_CREATION_GROUPS_CONFIG, "g1, g2, g1");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Source connector config should fail preflight validation when 
the same topic creation group is specified multiple times",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasDuplicateTransformations() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String transformName = "t";
+        config.put(TRANSFORMS_CONFIG, transformName + ", " + transformName);
+        config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", 
Filter.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when the 
same transformation is specified multiple times",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasMissingTransformClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String transformName = "t";
+        config.put(TRANSFORMS_CONFIG, transformName);
+        config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", 
"WheresTheFruit");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
transformation with a class not found on the worker is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasInvalidTransformClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String transformName = "t";
+        config.put(TRANSFORMS_CONFIG, transformName);
+        config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", 
MonitorableSinkConnector.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
transformation with a class of the wrong type is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasNegationForUndefinedPredicate() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String transformName = "t";
+        config.put(TRANSFORMS_CONFIG, transformName);
+        config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", 
Filter.class.getName());
+        config.put(TRANSFORMS_CONFIG + "." + transformName + ".negate", 
"true");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when an 
undefined predicate is negated",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasDuplicatePredicates() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String predicateName = "p";
+        config.put(PREDICATES_CONFIG, predicateName + ", " + predicateName);
+        config.put(PREDICATES_CONFIG + "." + predicateName + ".type", 
RecordIsTombstone.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when the 
same predicate is specified multiple times",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasMissingPredicateClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String predicateName = "p";
+        config.put(PREDICATES_CONFIG, predicateName);
+        config.put(PREDICATES_CONFIG + "." + predicateName + ".type", 
"WheresTheFruit");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
predicate with a class not found on the worker is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasInvalidPredicateClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String predicateName = "p";
+        config.put(PREDICATES_CONFIG, predicateName);
+        config.put(PREDICATES_CONFIG + "." + predicateName + ".type", 
MonitorableSinkConnector.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
predicate with a class of the wrong type is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasMissingConverterClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(KEY_CONVERTER_CLASS_CONFIG, "WheresTheFruit");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
converter with a class not found on the worker is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasMissingHeaderConverterClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(HEADER_CONVERTER_CLASS_CONFIG, "WheresTheFruit");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
header converter with a class not found on the worker is specified",
+                0
+        );
+    }
+
+    private Map<String, String> defaultSourceConnectorProps() {
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(NAME_CONFIG, "source-connector");
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, "1");
+        props.put(TOPIC_CONFIG, "t1");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+        return props;
+    }
+
+    private Map<String, String> defaultSinkConnectorProps() {
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(NAME_CONFIG, "sink-connector");
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSinkConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, "1");
+        props.put(TOPICS_CONFIG, "t1");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+        return props;
+    }
+
+}

Review Comment:
   Ack, done 👍



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+
+/**
+ * Integration test for preflight connector config validation
+ */
+@Category(IntegrationTest.class)
+public class ConnectorValidationIntegrationTest {
+
+    private static final String WORKER_GROUP_ID = "connect-worker-group-id";
+
+    // Use a single embedded cluster for all test cases in order to cut down 
on runtime
+    private static EmbeddedConnectCluster connect;
+
+    @BeforeClass
+    public static void setup() {
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("connector-validation-connect-cluster")
+                .workerProps(workerProps)
+                .build();
+        connect.start();
+    }
+
+    @AfterClass
+    public static void close() {
+        if (connect != null) {
+            // stop all Connect, Kafka and Zk threads.
+            Utils.closeQuietly(connect::stop, "Embedded Connect cluster");
+        }
+    }
+
+    @Test
+    public void testSinkConnectorHasNeitherTopicsListNorTopicsRegex() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.remove(TOPICS_CONFIG);
+        config.remove(TOPICS_REGEX_CONFIG);
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                2, // One error each for topics list and topics regex
+                "Sink connector config should fail preflight validation when 
neither topics list nor topics regex are provided",
+                0
+        );
+    }
+
+    @Test
+    public void testSinkConnectorHasBothTopicsListAndTopicsRegex() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(TOPICS_CONFIG, "t1");
+        config.put(TOPICS_REGEX_CONFIG, "r.*");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                2, // One error each for topics list and topics regex
+                "Sink connector config should fail preflight validation when 
both topics list and topics regex are provided",
+                0
+        );
+    }
+
+    @Test
+    public void testSinkConnectorDeadLetterQueueTopicInTopicsList() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(TOPICS_CONFIG, "t1");
+        config.put(DLQ_TOPIC_NAME_CONFIG, "t1");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Sink connector config should fail preflight validation when 
DLQ topic is included in topics list",
+                0
+        );
+    }
+
+    @Test
+    public void testSinkConnectorDeadLetterQueueTopicMatchesTopicsRegex() 
throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(TOPICS_REGEX_CONFIG, "r.*");
+        config.put(DLQ_TOPIC_NAME_CONFIG, "ruh.roh");
+        config.remove(TOPICS_CONFIG);
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Sink connector config should fail preflight validation when 
DLQ topic matches topics regex",
+                0
+        );
+    }
+
+    @Test
+    public void testSinkConnectorDefaultGroupIdConflictsWithWorkerGroupId() 
throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        // Combined with the logic in SinkUtils::consumerGroupId, this should 
conflict with the worker group ID
+        config.put(NAME_CONFIG, "worker-group-id");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Sink connector config should fail preflight validation when 
default consumer group ID conflicts with Connect worker group ID",
+                0
+        );
+    }
+
+    @Test
+    public void testSinkConnectorOverriddenGroupIdConflictsWithWorkerGroupId() 
throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
GROUP_ID_CONFIG, WORKER_GROUP_ID);
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Sink connector config should fail preflight validation when 
overridden consumer group ID conflicts with Connect worker group ID",
+                0
+        );
+    }
+
+    @Test
+    public void testSourceConnectorHasDuplicateTopicCreationGroups() throws 
InterruptedException {
+        Map<String, String> config = defaultSourceConnectorProps();
+        config.put(TOPIC_CREATION_GROUPS_CONFIG, "g1, g2, g1");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Source connector config should fail preflight validation when 
the same topic creation group is specified multiple times",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasDuplicateTransformations() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String transformName = "t";
+        config.put(TRANSFORMS_CONFIG, transformName + ", " + transformName);
+        config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", 
Filter.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when the 
same transformation is specified multiple times",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasMissingTransformClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String transformName = "t";
+        config.put(TRANSFORMS_CONFIG, transformName);
+        config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", 
"WheresTheFruit");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
transformation with a class not found on the worker is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasInvalidTransformClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String transformName = "t";
+        config.put(TRANSFORMS_CONFIG, transformName);
+        config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", 
MonitorableSinkConnector.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
transformation with a class of the wrong type is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasNegationForUndefinedPredicate() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String transformName = "t";
+        config.put(TRANSFORMS_CONFIG, transformName);
+        config.put(TRANSFORMS_CONFIG + "." + transformName + ".type", 
Filter.class.getName());
+        config.put(TRANSFORMS_CONFIG + "." + transformName + ".negate", 
"true");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when an 
undefined predicate is negated",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasDuplicatePredicates() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String predicateName = "p";
+        config.put(PREDICATES_CONFIG, predicateName + ", " + predicateName);
+        config.put(PREDICATES_CONFIG + "." + predicateName + ".type", 
RecordIsTombstone.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when the 
same predicate is specified multiple times",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasMissingPredicateClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String predicateName = "p";
+        config.put(PREDICATES_CONFIG, predicateName);
+        config.put(PREDICATES_CONFIG + "." + predicateName + ".type", 
"WheresTheFruit");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
predicate with a class not found on the worker is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasInvalidPredicateClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        String predicateName = "p";
+        config.put(PREDICATES_CONFIG, predicateName);
+        config.put(PREDICATES_CONFIG + "." + predicateName + ".type", 
MonitorableSinkConnector.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
predicate with a class of the wrong type is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasMissingConverterClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(KEY_CONVERTER_CLASS_CONFIG, "WheresTheFruit");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
converter with a class not found on the worker is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasMissingHeaderConverterClass() throws 
InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(HEADER_CONVERTER_CLASS_CONFIG, "WheresTheFruit");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a 
header converter with a class not found on the worker is specified",
+                0
+        );
+    }
+
+    private Map<String, String> defaultSourceConnectorProps() {
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(NAME_CONFIG, "source-connector");
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, "1");
+        props.put(TOPIC_CONFIG, "t1");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+        return props;
+    }
+
+    private Map<String, String> defaultSinkConnectorProps() {
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(NAME_CONFIG, "sink-connector");
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSinkConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, "1");
+        props.put(TOPICS_CONFIG, "t1");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+        return props;
+    }
+
+}

Review Comment:
   Ack, done 👍



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java:
##########
@@ -90,50 +93,83 @@ public SinkConnectorConfig(Plugins plugins, Map<String, 
String> props) {
      * @param props sink configuration properties
      */
     public static void validate(Map<String, String> props) {
-        final boolean hasTopicsConfig = hasTopicsConfig(props);
-        final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
-        final boolean hasDlqTopicConfig = hasDlqTopicConfig(props);
+        Map<String, ConfigValue> validatedConfig = new LinkedHashMap<>();
+        validate(props, validatedConfig);
+        validatedConfig.values().stream()
+                .filter(configValue -> !configValue.errorMessages().isEmpty())
+                .findFirst()
+                .ifPresent(configValue -> {
+                    throw new ConfigException(configValue.name(), 
configValue.value(), configValue.errorMessages().get(0));
+                });
+    }
+
+    /**
+     * Perform preflight validation for the sink-specific properties for a 
connector.
+     *
+     * @param props           the configuration for the sink connector
+     * @param validatedConfig any already-known {@link ConfigValue validation 
results} for the configuration.
+     *                        May be empty, but may not be null. Any 
configuration errors discovered by this method will
+     *                        be {@link ConfigValue#addErrorMessage(String) 
added} to a value in this map, adding a new
+     *                        entry if one for the problematic property does 
not already exist.
+     */
+    public static void validate(Map<String, String> props, Map<String, 
ConfigValue> validatedConfig) {
+        final String topicsList = props.get(TOPICS_CONFIG);
+        final String topicsRegex = props.get(TOPICS_REGEX_CONFIG);
+        final String dlqTopic = props.getOrDefault(DLQ_TOPIC_NAME_CONFIG, 
"").trim();
+        final boolean hasTopicsConfig = !Utils.isBlank(topicsList);
+        final boolean hasTopicsRegexConfig = !Utils.isBlank(topicsRegex);
+        final boolean hasDlqTopicConfig = !Utils.isBlank(dlqTopic);
 
         if (hasTopicsConfig && hasTopicsRegexConfig) {
-            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + 
SinkTask.TOPICS_REGEX_CONFIG +
-                " are mutually exclusive options, but both are set.");
+            String errorMessage = TOPICS_CONFIG + " and " + 
TOPICS_REGEX_CONFIG + " are mutually exclusive options, but both are set.";
+            addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, 
errorMessage);
+            addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, 
errorMessage);
         }
 
         if (!hasTopicsConfig && !hasTopicsRegexConfig) {
-            throw new ConfigException("Must configure one of " +
-                SinkTask.TOPICS_CONFIG + " or " + 
SinkTask.TOPICS_REGEX_CONFIG);
+            String errorMessage = "Must configure one of " + TOPICS_CONFIG + " 
or " + TOPICS_REGEX_CONFIG;
+            addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, 
errorMessage);
+            addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, 
errorMessage);
         }
 
         if (hasDlqTopicConfig) {
-            String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim();
             if (hasTopicsConfig) {
                 List<String> topics = parseTopicsList(props);
                 if (topics.contains(dlqTopic)) {
-                    throw new ConfigException(String.format("The DLQ topic 
'%s' may not be included in the list of "
-                            + "topics ('%s=%s') consumed by the connector", 
dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topics));
+                    String errorMessage = String.format(
+                            "The DLQ topic '%s' may not be included in the 
list of topics ('%s=%s') consumed by the connector",
+                            dlqTopic, TOPICS_CONFIG, topics
+                    );
+                    addErrorMessage(validatedConfig, TOPICS_CONFIG, 
topicsList, errorMessage);
                 }
             }
             if (hasTopicsRegexConfig) {
-                String topicsRegexStr = 
props.get(SinkTask.TOPICS_REGEX_CONFIG);
-                Pattern pattern = Pattern.compile(topicsRegexStr);
+                Pattern pattern = Pattern.compile(topicsRegex);
                 if (pattern.matcher(dlqTopic).matches()) {
-                    throw new ConfigException(String.format("The DLQ topic 
'%s' may not be included in the regex matching the "
-                            + "topics ('%s=%s') consumed by the connector", 
dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topicsRegexStr));
+                    String errorMessage = String.format(
+                            "The DLQ topic '%s' may not be included in the 
regex matching the topics ('%s=%s') consumed by the connector",

Review Comment:
   This is the existing language on trunk, but I agree that "match" is clearer 
than "include". Done 👍



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java:
##########
@@ -90,50 +93,83 @@ public SinkConnectorConfig(Plugins plugins, Map<String, 
String> props) {
      * @param props sink configuration properties
      */
     public static void validate(Map<String, String> props) {
-        final boolean hasTopicsConfig = hasTopicsConfig(props);
-        final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
-        final boolean hasDlqTopicConfig = hasDlqTopicConfig(props);
+        Map<String, ConfigValue> validatedConfig = new LinkedHashMap<>();
+        validate(props, validatedConfig);
+        validatedConfig.values().stream()
+                .filter(configValue -> !configValue.errorMessages().isEmpty())
+                .findFirst()
+                .ifPresent(configValue -> {
+                    throw new ConfigException(configValue.name(), 
configValue.value(), configValue.errorMessages().get(0));
+                });
+    }
+
+    /**
+     * Perform preflight validation for the sink-specific properties for a 
connector.
+     *
+     * @param props           the configuration for the sink connector
+     * @param validatedConfig any already-known {@link ConfigValue validation 
results} for the configuration.
+     *                        May be empty, but may not be null. Any 
configuration errors discovered by this method will
+     *                        be {@link ConfigValue#addErrorMessage(String) 
added} to a value in this map, adding a new
+     *                        entry if one for the problematic property does 
not already exist.
+     */
+    public static void validate(Map<String, String> props, Map<String, 
ConfigValue> validatedConfig) {
+        final String topicsList = props.get(TOPICS_CONFIG);
+        final String topicsRegex = props.get(TOPICS_REGEX_CONFIG);
+        final String dlqTopic = props.getOrDefault(DLQ_TOPIC_NAME_CONFIG, 
"").trim();
+        final boolean hasTopicsConfig = !Utils.isBlank(topicsList);
+        final boolean hasTopicsRegexConfig = !Utils.isBlank(topicsRegex);
+        final boolean hasDlqTopicConfig = !Utils.isBlank(dlqTopic);
 
         if (hasTopicsConfig && hasTopicsRegexConfig) {
-            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + 
SinkTask.TOPICS_REGEX_CONFIG +
-                " are mutually exclusive options, but both are set.");
+            String errorMessage = TOPICS_CONFIG + " and " + 
TOPICS_REGEX_CONFIG + " are mutually exclusive options, but both are set.";
+            addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, 
errorMessage);
+            addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, 
errorMessage);
         }
 
         if (!hasTopicsConfig && !hasTopicsRegexConfig) {
-            throw new ConfigException("Must configure one of " +
-                SinkTask.TOPICS_CONFIG + " or " + 
SinkTask.TOPICS_REGEX_CONFIG);
+            String errorMessage = "Must configure one of " + TOPICS_CONFIG + " 
or " + TOPICS_REGEX_CONFIG;
+            addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, 
errorMessage);
+            addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, 
errorMessage);
         }
 
         if (hasDlqTopicConfig) {
-            String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim();
             if (hasTopicsConfig) {
                 List<String> topics = parseTopicsList(props);
                 if (topics.contains(dlqTopic)) {
-                    throw new ConfigException(String.format("The DLQ topic 
'%s' may not be included in the list of "
-                            + "topics ('%s=%s') consumed by the connector", 
dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topics));
+                    String errorMessage = String.format(
+                            "The DLQ topic '%s' may not be included in the 
list of topics ('%s=%s') consumed by the connector",
+                            dlqTopic, TOPICS_CONFIG, topics
+                    );
+                    addErrorMessage(validatedConfig, TOPICS_CONFIG, 
topicsList, errorMessage);
                 }
             }
             if (hasTopicsRegexConfig) {
-                String topicsRegexStr = 
props.get(SinkTask.TOPICS_REGEX_CONFIG);
-                Pattern pattern = Pattern.compile(topicsRegexStr);
+                Pattern pattern = Pattern.compile(topicsRegex);
                 if (pattern.matcher(dlqTopic).matches()) {
-                    throw new ConfigException(String.format("The DLQ topic 
'%s' may not be included in the regex matching the "
-                            + "topics ('%s=%s') consumed by the connector", 
dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topicsRegexStr));
+                    String errorMessage = String.format(
+                            "The DLQ topic '%s' may not be included in the 
regex matching the topics ('%s=%s') consumed by the connector",

Review Comment:
   This is the existing language on trunk, but I agree that "match" is clearer 
than "include". Done 👍



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java:
##########
@@ -239,7 +239,23 @@ protected Optional<Boolean> checkTopicSettings(String 
topicName, int replicas, i
      * @param numErrors      the number of errors expected
      */
     public void assertExactlyNumErrorsOnConnectorConfigValidation(String 
connectorClass, Map<String, String> connConfig,
-        int numErrors, String detailMessage) throws InterruptedException {
+                                                                  int 
numErrors, String detailMessage) throws InterruptedException {
+        assertExactlyNumErrorsOnConnectorConfigValidation(connectorClass, 
connConfig, numErrors, detailMessage, VALIDATION_DURATION_MS);
+    }
+
+    /**
+     * Assert that the required number of errors are produced by a connector 
config validation.
+     *
+     * @param connectorClass the class of the connector to validate
+     * @param connConfig     the intended configuration
+     * @param numErrors      the number of errors expected
+     * @param timeout        how long to retry for before throwing an exception

Review Comment:
   Good catch! TBF it's also missing [on 
trunk](https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java#L234-L240),
 but it doesn't hurt to add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to