This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f0a4059083 KAFKA-15575: Begin enforcing 'tasks.max' property for 
connectors (#15180)
4f0a4059083 is described below

commit 4f0a40590833a141c78341ce95ffc782747c5ac8
Author: Chris Egerton <chr...@aiven.io>
AuthorDate: Thu Feb 1 11:33:04 2024 -0500

    KAFKA-15575: Begin enforcing 'tasks.max' property for connectors (#15180)
    
    Reviewers: Ashwin Pankaj <apan...@confluent.io>, Greg Harris 
<greg.har...@aiven.io>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/common/utils/LogCaptureAppender.java     |   8 +
 .../kafka/connect/runtime/ConnectorConfig.java     |  19 ++
 .../connect/runtime/TooManyTasksException.java     |  43 ++++
 .../org/apache/kafka/connect/runtime/Worker.java   |  79 ++++--
 .../kafka/connect/runtime/WorkerConnector.java     |  31 ++-
 .../runtime/distributed/DistributedHerder.java     |   6 +-
 .../connect/integration/BlockingConnectorTest.java |  24 +-
 .../integration/ConnectWorkerIntegrationTest.java  | 159 ++++++++++++
 .../integration/MonitorableSourceConnector.java    |  20 +-
 .../integration/OffsetsApiIntegrationTest.java     |   4 +
 .../kafka/connect/runtime/AbstractHerderTest.java  |  12 +-
 .../apache/kafka/connect/runtime/WorkerTest.java   | 274 ++++++++++++++++++++-
 .../runtime/distributed/DistributedHerderTest.java |  42 ++++
 .../connect/util/clusters/ConnectAssertions.java   |  23 ++
 .../connect/util/clusters/EmbeddedConnect.java     |  17 ++
 16 files changed, 721 insertions(+), 42 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 9b63e009693..925c50dec86 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -151,7 +151,7 @@
               files="LoggingResource.java" />
 
     <suppress checks="ClassDataAbstractionCoupling"
-              
files="(RestServer|AbstractHerder|DistributedHerder|Worker).java"/>
+              
files="(RestServer|AbstractHerder|DistributedHerder|Worker(Test)?).java"/>
 
     <suppress checks="BooleanExpressionComplexity"
               files="JsonConverter.java"/>
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java 
b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
index 4f035840bd2..eb592c11863 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
@@ -24,6 +24,7 @@ import org.apache.log4j.spi.LoggingEvent;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class LogCaptureAppender extends AppenderSkeleton implements 
AutoCloseable {
     private final List<LoggingEvent> events = new LinkedList<>();
@@ -100,6 +101,13 @@ public class LogCaptureAppender extends AppenderSkeleton 
implements AutoCloseabl
         }
     }
 
+    public List<String> getMessages(String level) {
+        return getEvents().stream()
+                .filter(e -> level.equals(e.getLevel()))
+                .map(Event::getMessage)
+                .collect(Collectors.toList());
+    }
+
     public List<String> getMessages() {
         final LinkedList<String> result = new LinkedList<>();
         synchronized (events) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index f33f00e40ef..d5cdc23fa36 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -115,6 +115,16 @@ public class ConnectorConfig extends AbstractConfig {
 
     private static final String TASK_MAX_DISPLAY = "Tasks max";
 
+    public static final String TASKS_MAX_ENFORCE_CONFIG = "tasks.max.enforce";
+    private static final String TASKS_MAX_ENFORCE_DOC =
+            "(Deprecated) Whether to enforce that the tasks.max property is 
respected by the connector. "
+                    + "By default, connectors that generate too many tasks 
will fail, and existing sets of tasks that exceed the tasks.max property will 
also be failed. "
+                    + "If this property is set to false, then connectors will 
be allowed to generate more than the maximum number of tasks, and existing sets 
of "
+                    + "tasks that exceed the tasks.max property will be 
allowed to run. "
+                    + "This property is deprecated and will be removed in an 
upcoming major release.";
+    public static final boolean TASKS_MAX_ENFORCE_DEFAULT = true;
+    private static final String TASKS_MAX_ENFORCE_DISPLAY = "Enforce tasks 
max";
+
     public static final String TRANSFORMS_CONFIG = "transforms";
     private static final String TRANSFORMS_DOC = "Aliases for the 
transformations to be applied to records.";
     private static final String TRANSFORMS_DISPLAY = "Transforms";
@@ -195,6 +205,7 @@ public class ConnectorConfig extends AbstractConfig {
                 .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, 
nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, 
++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
                 .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, 
CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, 
CONNECTOR_CLASS_DISPLAY)
                 .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, 
atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 
++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY)
+                .define(TASKS_MAX_ENFORCE_CONFIG, Type.BOOLEAN, 
TASKS_MAX_ENFORCE_DEFAULT, Importance.LOW, TASKS_MAX_ENFORCE_DOC, COMMON_GROUP, 
++orderInGroup, Width.SHORT, TASKS_MAX_ENFORCE_DISPLAY)
                 .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, 
KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, 
COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
                 .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, 
VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, 
COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
                 .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, 
HEADER_CONVERTER_CLASS_DEFAULT, HEADER_CONVERTER_CLASS_VALIDATOR, 
Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, 
Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
@@ -281,6 +292,14 @@ public class ConnectorConfig extends AbstractConfig {
         return getBoolean(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG);
     }
 
+    public int tasksMax() {
+        return getInt(TASKS_MAX_CONFIG);
+    }
+
+    public boolean enforceTasksMax() {
+        return getBoolean(TASKS_MAX_ENFORCE_CONFIG);
+    }
+
     /**
      * Returns the initialized list of {@link TransformationStage} which apply 
the
      * {@link Transformation transformations} and {@link Predicate predicates}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TooManyTasksException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TooManyTasksException.java
new file mode 100644
index 00000000000..62a2ea5a733
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TooManyTasksException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * Thrown when a connector has generated too many task configs (i.e., more 
tasks than
+ * the value for {@link ConnectorConfig#TASKS_MAX_CONFIG tasks.max} that it has
+ * been configured with).
+ */
+public class TooManyTasksException extends ConnectException {
+
+    public TooManyTasksException(String connName, int numTasks, int maxTasks) {
+        super(String.format(
+                "The connector %s has generated %d tasks, which is greater 
than %d, "
+                        + "the maximum number of tasks it is configured to 
create. "
+                        + "This behaviour should be considered a bug and is 
disallowed. "
+                        + "If necessary, it can be permitted by reconfiguring 
the connector "
+                        + "with '%s' set to false; however, this option will 
be removed in a "
+                        + "future release of Kafka Connect.",
+                connName,
+                numTasks,
+                maxTasks,
+                ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG
+        ));
+    }
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index abdf064d0bb..81ab8617247 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -389,13 +389,21 @@ public class Worker {
             if (workerConnector == null)
                 throw new ConnectException("Connector " + connName + " not 
found in this worker.");
 
-            int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
+            int maxTasks = connConfig.tasksMax();
             Map<String, String> connOriginals = connConfig.originalsStrings();
 
             Connector connector = workerConnector.connector();
             try (LoaderSwap loaderSwap = 
plugins.withClassLoader(workerConnector.loader())) {
                 String taskClassName = connector.taskClass().getName();
-                for (Map<String, String> taskProps : 
connector.taskConfigs(maxTasks)) {
+                List<Map<String, String>> taskConfigs = 
connector.taskConfigs(maxTasks);
+                try {
+                    checkTasksMax(connName, taskConfigs.size(), maxTasks, 
connConfig.enforceTasksMax());
+                } catch (TooManyTasksException e) {
+                    // TODO: This control flow is awkward. Push task config 
generation into WorkerConnector class?
+                    workerConnector.fail(e);
+                    throw e;
+                }
+                for (Map<String, String> taskProps : taskConfigs) {
                     // Ensure we don't modify the connector's copy of the 
config
                     Map<String, String> taskConfig = new HashMap<>(taskProps);
                     taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, 
taskClassName);
@@ -413,6 +421,26 @@ public class Worker {
         return result;
     }
 
+    private void checkTasksMax(String connName, int numTasks, int maxTasks, 
boolean enforce) {
+        if (numTasks > maxTasks) {
+            if (enforce) {
+                throw new TooManyTasksException(connName, numTasks, maxTasks);
+            } else {
+                log.warn(
+                        "The connector {} has generated {} tasks, which is 
greater than {}, "
+                                + "the maximum number of tasks it is 
configured to create. "
+                                + "This behavior should be considered a bug 
and will be disallowed "
+                                + "in future releases of Kafka Connect. Please 
report this to the "
+                                + "maintainers of the connector and request 
that they adjust their "
+                                + "connector's taskConfigs() method to respect 
the maxTasks parameter.",
+                        connName,
+                        numTasks,
+                        maxTasks
+                );
+            }
+        }
+    }
+
     /**
      * Stop a connector managed by this worker.
      *
@@ -522,12 +550,12 @@ public class Worker {
     /**
      * Start a sink task managed by this worker.
      *
-     * @param id the task ID.
-     * @param configState the most recent {@link ClusterConfigState} known to 
the worker
-     * @param connProps the connector properties.
-     * @param taskProps the tasks properties.
+     * @param id             the task ID.
+     * @param configState    the most recent {@link ClusterConfigState} known 
to the worker
+     * @param connProps      the connector properties.
+     * @param taskProps      the tasks properties.
      * @param statusListener a listener for the runtime status transitions of 
the task.
-     * @param initialState the initial state of the connector.
+     * @param initialState   the initial state of the connector.
      * @return true if the task started successfully.
      */
     public boolean startSinkTask(
@@ -538,19 +566,19 @@ public class Worker {
             TaskStatus.Listener statusListener,
             TargetState initialState
     ) {
-        return startTask(id, connProps, taskProps, statusListener,
+        return startTask(id, connProps, taskProps, configState, statusListener,
                 new SinkTaskBuilder(id, configState, statusListener, 
initialState));
     }
 
     /**
      * Start a source task managed by this worker using older behavior that 
does not provide exactly-once support.
      *
-     * @param id the task ID.
-     * @param configState the most recent {@link ClusterConfigState} known to 
the worker
-     * @param connProps the connector properties.
-     * @param taskProps the tasks properties.
+     * @param id             the task ID.
+     * @param configState    the most recent {@link ClusterConfigState} known 
to the worker
+     * @param connProps      the connector properties.
+     * @param taskProps      the tasks properties.
      * @param statusListener a listener for the runtime status transitions of 
the task.
-     * @param initialState the initial state of the connector.
+     * @param initialState   the initial state of the connector.
      * @return true if the task started successfully.
      */
     public boolean startSourceTask(
@@ -561,20 +589,20 @@ public class Worker {
             TaskStatus.Listener statusListener,
             TargetState initialState
     ) {
-        return startTask(id, connProps, taskProps, statusListener,
+        return startTask(id, connProps, taskProps, configState, statusListener,
                 new SourceTaskBuilder(id, configState, statusListener, 
initialState));
     }
 
     /**
      * Start a source task with exactly-once support managed by this worker.
      *
-     * @param id the task ID.
-     * @param configState the most recent {@link ClusterConfigState} known to 
the worker
-     * @param connProps the connector properties.
-     * @param taskProps the tasks properties.
-     * @param statusListener a listener for the runtime status transitions of 
the task.
-     * @param initialState the initial state of the connector.
-     * @param preProducerCheck a preflight check that should be performed 
before the task initializes its transactional producer.
+     * @param id                the task ID.
+     * @param configState       the most recent {@link ClusterConfigState} 
known to the worker
+     * @param connProps         the connector properties.
+     * @param taskProps         the tasks properties.
+     * @param statusListener    a listener for the runtime status transitions 
of the task.
+     * @param initialState      the initial state of the connector.
+     * @param preProducerCheck  a preflight check that should be performed 
before the task initializes its transactional producer.
      * @param postProducerCheck a preflight check that should be performed 
after the task initializes its transactional producer,
      *                          but before producing any source records or 
offsets.
      * @return true if the task started successfully.
@@ -589,7 +617,7 @@ public class Worker {
             Runnable preProducerCheck,
             Runnable postProducerCheck
     ) {
-        return startTask(id, connProps, taskProps, statusListener,
+        return startTask(id, connProps, taskProps, configState, statusListener,
                 new ExactlyOnceSourceTaskBuilder(id, configState, 
statusListener, initialState, preProducerCheck, postProducerCheck));
     }
 
@@ -599,6 +627,7 @@ public class Worker {
      * @param id the task ID.
      * @param connProps the connector properties.
      * @param taskProps the tasks properties.
+     * @param configState the most recent {@link ClusterConfigState} known to 
the worker
      * @param statusListener a listener for the runtime status transitions of 
the task.
      * @param taskBuilder the {@link TaskBuilder} used to create the {@link 
WorkerTask} that manages the lifecycle of the task.
      * @return true if the task started successfully.
@@ -607,6 +636,7 @@ public class Worker {
             ConnectorTaskId id,
             Map<String, String> connProps,
             Map<String, String> taskProps,
+            ClusterConfigState configState,
             TaskStatus.Listener statusListener,
             TaskBuilder<?, ?> taskBuilder
     ) {
@@ -624,6 +654,11 @@ public class Worker {
 
             try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
                 final ConnectorConfig connConfig = new 
ConnectorConfig(plugins, connProps);
+
+                int maxTasks = connConfig.tasksMax();
+                int numTasks = configState.taskCount(id.connector());
+                checkTasksMax(id.connector(), numTasks, maxTasks, 
connConfig.enforceTasksMax());
+
                 final TaskConfig taskConfig = new TaskConfig(taskProps);
                 final Class<? extends Task> taskClass = 
taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
                 final Task task = plugins.newTask(taskClass);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index f2a289f92c7..4bbe1f42b8b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -73,6 +73,7 @@ public class WorkerConnector implements Runnable {
     private final AtomicReference<TargetState> pendingTargetStateChange;
     private final AtomicReference<Callback<TargetState>> 
pendingStateChangeCallback;
     private final CountDownLatch shutdownLatch;
+    private volatile Throwable externalFailure;
     private volatile boolean stopping;  // indicates whether the Worker has 
asked the connector to stop
     private volatile boolean cancelled; // indicates whether the Worker has 
cancelled the connector (e.g. because of slow shutdown)
 
@@ -102,6 +103,7 @@ public class WorkerConnector implements Runnable {
         this.pendingTargetStateChange = new AtomicReference<>();
         this.pendingStateChangeCallback = new AtomicReference<>();
         this.shutdownLatch = new CountDownLatch(1);
+        this.externalFailure = null;
         this.stopping = false;
         this.cancelled = false;
     }
@@ -131,9 +133,27 @@ public class WorkerConnector implements Runnable {
         }
     }
 
+    /**
+     * Fail the connector.
+     * @param cause the cause of the failure; if null, the connector will not 
be failed
+     */
+    public void fail(Throwable cause) {
+        synchronized (this) {
+            if (this.externalFailure != null)
+                return;
+            log.error("{} Connector has failed", this, cause);
+            this.externalFailure = cause;
+            notify();
+        }
+    }
+
     void doRun() {
         initialize();
         while (!stopping) {
+            Throwable failure = externalFailure;
+            if (failure != null)
+                onFailure(failure);
+
             TargetState newTargetState;
             Callback<TargetState> stateChangeCallback;
             synchronized (this) {
@@ -144,7 +164,10 @@ public class WorkerConnector implements Runnable {
                 doTransitionTo(newTargetState, stateChangeCallback);
             }
             synchronized (this) {
-                if (pendingTargetStateChange.get() != null || stopping) {
+                if (pendingTargetStateChange.get() != null
+                        || (!State.FAILED.equals(state) && externalFailure != 
null)
+                        || stopping
+                ) {
                     // An update occurred before we entered the synchronized 
block; no big deal,
                     // just start the loop again until we've handled everything
                 } else {
@@ -203,7 +226,11 @@ public class WorkerConnector implements Runnable {
         }
     }
 
-    private void onFailure(Throwable t) {
+    private synchronized void onFailure(Throwable t) {
+        // If we've already failed, we don't overwrite the last-reported cause 
of failure
+        if (this.state == State.FAILED)
+            return;
+
         statusListener.onFailure(connName, t);
         this.state = State.FAILED;
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 324f4cd5de7..c4e5bb15584 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -50,6 +50,7 @@ import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.runtime.TooManyTasksException;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
 import org.apache.kafka.connect.runtime.rest.RestClient;
@@ -154,7 +155,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(10);
     private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS = 
250;
-    private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS = 
60000;
+    static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS = 60000;
     private static final long CONFIG_TOPIC_WRITE_PRIVILEGES_BACKOFF_MS = 250;
     private static final int START_STOP_THREAD_POOL_SIZE = 8;
     private static final short BACKOFF_RETRIES = 5;
@@ -2139,6 +2140,9 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             if (error != null) {
                 if (isPossibleExpiredKeyException(initialRequestTime, error)) {
                     log.debug("Failed to reconfigure connector's tasks ({}), 
possibly due to expired session key. Retrying after backoff", connName);
+                } else if (error instanceof TooManyTasksException) {
+                    log.debug("Connector {} generated too many tasks; will not 
retry configuring connector", connName);
+                    return;
                 } else {
                     log.error("Failed to reconfigure connector's tasks ({}), 
retrying after backoff.", connName, error);
                 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 8465a1e31e3..d39632a48e5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -62,6 +62,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
@@ -146,6 +147,7 @@ public class BlockingConnectorTest {
         connect.stop();
         // unblock everything so that we don't leak threads after each test run
         Block.reset();
+        Block.join();
     }
 
     @Test
@@ -442,11 +444,29 @@ public class BlockingConnectorTest {
             resetAwaitBlockLatch();
             BLOCK_LATCHES.forEach(CountDownLatch::countDown);
             BLOCK_LATCHES.clear();
+        }
+
+        /**
+         * {@link Thread#join(long millis) Await} the termination of all 
threads that have been
+         * intentionally blocked either since the last invocation of this 
method or, if this method
+         * has never been invoked, all threads that have ever been blocked.
+         */
+        public static synchronized void join() {
             BLOCKED_THREADS.forEach(t -> {
                 try {
                     t.join(30_000);
                     if (t.isAlive()) {
-                        log.warn("Thread {} failed to finish in time", t);
+                        log.warn(
+                                "Thread {} failed to finish in time; current 
stack trace:\n{}",
+                                t,
+                                Stream.of(t.getStackTrace())
+                                        .map(s -> String.format(
+                                                "\t%s.%s:%d",
+                                                s.getClassName(),
+                                                s.getMethodName(),
+                                                s.getLineNumber()
+                                        )).collect(Collectors.joining("\n"))
+                        );
                     }
                 } catch (InterruptedException e) {
                     throw new RuntimeException("Interrupted while waiting for 
blocked thread " + t + " to finish");
@@ -777,7 +797,7 @@ public class BlockingConnectorTest {
 
         @Override
         public List<Map<String, String>> taskConfigs(int maxTasks) {
-            return IntStream.rangeClosed(0, maxTasks)
+            return IntStream.range(0, maxTasks)
                 .mapToObj(i -> new HashMap<>(props))
                 .collect(Collectors.toList());
         }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 9dca7425d66..d651ef87090 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -18,12 +18,17 @@ package org.apache.kafka.connect.integration;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.WorkerHandle;
 import org.apache.kafka.test.IntegrationTest;
@@ -54,6 +59,7 @@ import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
@@ -881,6 +887,159 @@ public class ConnectWorkerIntegrationTest {
         connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
     }
 
+    /**
+     * Tests the logic around enforcement of the
+     * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
+     * property and how it can be toggled via the
+     * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG 
tasks.max.enforce}
+     * property, following the test plain laid out in
+     * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan";>KIP-1004</a>.
+     */
+    @Test
+    public void testTasksMaxEnforcement() throws Exception {
+        String configTopic = "tasks-max-enforcement-configs";
+        workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(
+                NUM_WORKERS,
+                "Initial group of workers did not start in time."
+        );
+
+        Map<String, String> connectorProps = 
defaultSourceConnectorProps(TOPIC_NAME);
+        int maxTasks = 1;
+        connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+        int numTasks = 2;
+        connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        // A connector that generates excessive tasks will be failed with an 
expected error message
+        connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+                CONNECTOR_NAME,
+                0,
+                "connector did not fail in time"
+        );
+
+        String expectedErrorSnippet = String.format(
+                "The connector %s has generated %d tasks, which is greater 
than %d, "
+                        + "the maximum number of tasks it is configured to 
create. ",
+                CONNECTOR_NAME,
+                numTasks,
+                maxTasks
+        );
+        String errorMessage = 
connect.connectorStatus(CONNECTOR_NAME).connector().trace();
+        assertThat(errorMessage, containsString(expectedErrorSnippet));
+
+        // Stop all workers in the cluster
+        connect.workers().forEach(connect::removeWorker);
+
+        // Publish a set of too many task configs to the config topic, to 
simulate
+        // an existing set of task configs that was written before the cluster 
was upgraded
+        try (JsonConverter converter = new JsonConverter()) {
+            converter.configure(
+                    
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"),
+                    false
+            );
+
+            for (int i = 0; i < numTasks; i++) {
+                Map<String, String> taskConfig = 
MonitorableSourceConnector.taskConfig(
+                        connectorProps,
+                        CONNECTOR_NAME,
+                        i
+                );
+                Struct wrappedTaskConfig = new 
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0)
+                        .put("properties", taskConfig);
+                String key = KafkaConfigBackingStore.TASK_KEY(new 
ConnectorTaskId(CONNECTOR_NAME, i));
+                byte[] value = converter.fromConnectData(
+                        configTopic,
+                        KafkaConfigBackingStore.TASK_CONFIGURATION_V0,
+                        wrappedTaskConfig
+                );
+                connect.kafka().produce(configTopic, key, new String(value));
+            }
+
+            Struct taskCommitMessage = new 
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0);
+            taskCommitMessage.put("tasks", numTasks);
+            String key = 
KafkaConfigBackingStore.COMMIT_TASKS_KEY(CONNECTOR_NAME);
+            byte[] value = converter.fromConnectData(
+                    configTopic,
+                    KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0,
+                    taskCommitMessage
+            );
+            connect.kafka().produce(configTopic, key, new String(value));
+        }
+
+        // Restart all the workers in the cluster
+        for (int i = 0; i < NUM_WORKERS; i++)
+            connect.addWorker();
+
+        // An existing set of tasks that exceeds the tasks.max property
+        // will be failed with an expected error message
+        connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+                CONNECTOR_NAME,
+                numTasks,
+                "connector and tasks did not fail in time"
+        );
+
+        connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, "false");
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        // That same existing set of tasks will be allowed to run
+        // once the connector is reconfigured with tasks.max.enforce set to 
false
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "connector and tasks did not start in time"
+        );
+
+        numTasks++;
+        connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        // A connector will be allowed to generate excessive tasks when 
tasks.max.enforce is set to false
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "connector and tasks did not start in time"
+        );
+
+        numTasks = maxTasks;
+        connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+        connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, "true");
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "connector and tasks did not start in time"
+        );
+
+        numTasks = maxTasks + 1;
+        connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        // A connector that generates excessive tasks after being reconfigured 
will be failed, but its existing tasks will continue running
+        connect.assertions().assertConnectorIsFailedAndNumTasksAreRunning(
+                CONNECTOR_NAME,
+                maxTasks,
+                "connector did not fail in time, or tasks were incorrectly 
failed"
+        );
+
+        // Make sure that the tasks have had a chance to fail (i.e., that the 
worker has been given
+        // a chance to check on the number of tasks for the connector during 
task startup)
+        for (int i = 0; i < maxTasks; i++)
+            connect.restartTask(CONNECTOR_NAME, i);
+
+        // Verify one more time that none of the tasks have actually failed
+        connect.assertions().assertConnectorIsFailedAndNumTasksAreRunning(
+                CONNECTOR_NAME,
+                maxTasks,
+                "connector did not fail in time, or tasks were incorrectly 
failed"
+        );
+    }
+
     private Map<String, String> defaultSourceConnectorProps(String topic) {
         // setup props for the source connector
         Map<String, String> props = new HashMap<>();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 1645aa0ecc2..049c75727a0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -51,6 +51,7 @@ public class MonitorableSourceConnector extends 
SampleSourceConnector {
     private static final Logger log = 
LoggerFactory.getLogger(MonitorableSourceConnector.class);
 
     public static final String TOPIC_CONFIG = "topic";
+    public static final String NUM_TASKS = "num.tasks";
     public static final String MESSAGES_PER_POLL_CONFIG = "messages.per.poll";
     public static final String MAX_MESSAGES_PER_SECOND_CONFIG = "throughput";
     public static final String MAX_MESSAGES_PRODUCED_CONFIG = "max.messages";
@@ -93,16 +94,27 @@ public class MonitorableSourceConnector extends 
SampleSourceConnector {
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
+        String numTasksProp = commonConfigs.get(NUM_TASKS);
+        int numTasks = numTasksProp != null ? Integer.parseInt(numTasksProp) : 
maxTasks;
         List<Map<String, String>> configs = new ArrayList<>();
-        for (int i = 0; i < maxTasks; i++) {
-            Map<String, String> config = new HashMap<>(commonConfigs);
-            config.put("connector.name", connectorName);
-            config.put("task.id", taskId(connectorName, i));
+        for (int i = 0; i < numTasks; i++) {
+            Map<String, String> config = taskConfig(commonConfigs, 
connectorName, i);
             configs.add(config);
         }
         return configs;
     }
 
+    public static Map<String, String> taskConfig(
+            Map<String, String> connectorProps,
+            String connectorName,
+            int taskNum
+    ) {
+        Map<String, String> result = new HashMap<>(connectorProps);
+        result.put("connector.name", connectorName);
+        result.put("task.id", taskId(connectorName, taskNum));
+        return result;
+    }
+
     @Override
     public void stop() {
         log.info("Stopped {} connector {}", this.getClass().getSimpleName(), 
connectorName);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
index f4c79ab44a3..b632267598f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
@@ -119,6 +119,8 @@ public class OffsetsApiIntegrationTest {
     public static void close() {
         // stop all Connect, Kafka and Zk threads.
         CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
+        // wait for all blocked threads created while testing zombie task 
scenarios to finish
+        BlockingConnectorTest.Block.join();
     }
 
     private static EmbeddedConnectCluster 
createOrReuseConnectWithWorkerProps(Map<String, String> workerProps) {
@@ -469,6 +471,7 @@ public class OffsetsApiIntegrationTest {
                 () -> connect.alterConnectorOffsets(connectorName, new 
ConnectorOffsets(offsetsToAlter)));
         assertThat(e.getMessage(), containsString("zombie sink task"));
 
+        // clean up blocked threads created while testing zombie task scenarios
         BlockingConnectorTest.Block.reset();
     }
 
@@ -810,6 +813,7 @@ public class OffsetsApiIntegrationTest {
         ConnectRestException e = assertThrows(ConnectRestException.class, () 
-> connect.resetConnectorOffsets(connectorName));
         assertThat(e.getMessage(), containsString("zombie sink task"));
 
+        // clean up blocked threads created while testing zombie task scenarios
         BlockingConnectorTest.Block.reset();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 7d7020604cc..7c39ce536be 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -470,8 +470,8 @@ public class AbstractHerderTest {
         assertEquals(2, result.errorCount());
         Map<String, ConfigInfo> infos = result.values().stream()
                 .collect(Collectors.toMap(info -> info.configKey().name(), 
Function.identity()));
-        // Base connector config has 14 fields, connector's configs add 7
-        assertEquals(21, infos.size());
+        // Base connector config has 15 fields, connector's configs add 7
+        assertEquals(22, infos.size());
         // Missing name should generate an error
         assertEquals(ConnectorConfig.NAME_CONFIG,
                 infos.get(ConnectorConfig.NAME_CONFIG).configValue().name());
@@ -582,7 +582,7 @@ public class AbstractHerderTest {
         assertEquals(1, result.errorCount());
         Map<String, ConfigInfo> infos = result.values().stream()
                 .collect(Collectors.toMap(info -> info.configKey().name(), 
Function.identity()));
-        assertEquals(26, infos.size());
+        assertEquals(27, infos.size());
         // Should get 2 type fields from the transforms, first adds its own 
config since it has a valid class
         assertEquals("transforms.xformA.type",
                 infos.get("transforms.xformA.type").configValue().name());
@@ -639,7 +639,7 @@ public class AbstractHerderTest {
         assertEquals(1, result.errorCount());
         Map<String, ConfigInfo> infos = result.values().stream()
                 .collect(Collectors.toMap(info -> info.configKey().name(), 
Function.identity()));
-        assertEquals(28, infos.size());
+        assertEquals(29, infos.size());
         // Should get 2 type fields from the transforms, first adds its own 
config since it has a valid class
         assertEquals("transforms.xformA.type", 
infos.get("transforms.xformA.type").configValue().name());
         
assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());
@@ -700,8 +700,8 @@ public class AbstractHerderTest {
         );
         assertEquals(expectedGroups, result.groups());
         assertEquals(1, result.errorCount());
-        // Base connector config has 14 fields, connector's configs add 7, and 
2 producer overrides
-        assertEquals(23, result.values().size());
+        // Base connector config has 15 fields, connector's configs add 7, and 
2 producer overrides
+        assertEquals(24, result.values().size());
         assertTrue(result.values().stream().anyMatch(
             configInfo -> ackConfigKey.equals(configInfo.configValue().name()) 
&& !configInfo.configValue().errors().isEmpty()));
         assertTrue(result.values().stream().anyMatch(
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index e424da0bc93..c48abb0130a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -42,6 +42,7 @@ import 
org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -130,6 +131,8 @@ import static 
org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_C
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
@@ -619,7 +622,21 @@ public class WorkerTest {
 
         assertStatistics(worker, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, 
anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
+
+        Map<String, String> connectorConfigs = anyConnectorConfigMap();
+        ClusterConfigState configState = new ClusterConfigState(
+                0,
+                null,
+                Collections.singletonMap(CONNECTOR_ID, 1),
+                Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
+                Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
+                Collections.singletonMap(TASK_ID, origProps),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet()
+        );
+        assertTrue(worker.startSourceTask(TASK_ID, configState, 
connectorConfigs, origProps, taskStatusListener, TargetState.STARTED));
         assertStatistics(worker, 0, 1);
         assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
@@ -662,7 +679,19 @@ public class WorkerTest {
         connectorConfigs.put(TOPICS_CONFIG, "t1");
         connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
SampleSinkConnector.class.getName());
 
-        worker.startSinkTask(TASK_ID, ClusterConfigState.EMPTY, 
connectorConfigs, origProps, taskStatusListener, TargetState.STARTED);
+        ClusterConfigState configState = new ClusterConfigState(
+                0,
+                null,
+                Collections.singletonMap(CONNECTOR_ID, 1),
+                Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
+                Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
+                Collections.singletonMap(TASK_ID, origProps),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet()
+        );
+        assertTrue(worker.startSinkTask(TASK_ID, configState, 
connectorConfigs, origProps, taskStatusListener, TargetState.STARTED));
         assertStatistics(worker, 0, 1);
         assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
@@ -718,7 +747,22 @@ public class WorkerTest {
 
         assertStatistics(worker, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.startExactlyOnceSourceTask(TASK_ID, ClusterConfigState.EMPTY,  
anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED, 
preProducer, postProducer);
+
+        Map<String, String> connectorConfigs = anyConnectorConfigMap();
+        ClusterConfigState configState = new ClusterConfigState(
+                0,
+                null,
+                Collections.singletonMap(CONNECTOR_ID, 1),
+                Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
+                Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
+                Collections.singletonMap(TASK_ID, origProps),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet()
+        );
+
+        assertTrue(worker.startExactlyOnceSourceTask(TASK_ID, configState,  
connectorConfigs, origProps, taskStatusListener, TargetState.STARTED, 
preProducer, postProducer));
         assertStatistics(worker, 0, 1);
         assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
@@ -784,7 +828,7 @@ public class WorkerTest {
             ClusterConfigState.EMPTY,
             anyConnectorConfigMap(),
             origProps,
-            taskStatusListener,
+                taskStatusListener,
             TargetState.STARTED);
 
         assertStatusMetrics(1L, "connector-running-task-count");
@@ -2522,6 +2566,228 @@ public class WorkerTest {
         verifyKafkaClusterId();
     }
 
+    @Test
+    public void testConnectorGeneratesTooManyTasksButMaxNotEnforced() throws 
Exception {
+        testConnectorGeneratesTooManyTasks(false);
+    }
+
+    @Test
+    public void testConnectorGeneratesTooManyTasksAndMaxEnforced() throws 
Exception {
+        testConnectorGeneratesTooManyTasks(true);
+    }
+
+    private void testConnectorGeneratesTooManyTasks(boolean enforced) throws 
Exception {
+        mockKafkaClusterId();
+
+        String connectorClass = SampleSourceConnector.class.getName();
+        connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
+        connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, 
Boolean.toString(enforced));
+        mockConnectorIsolation(connectorClass, sourceConnector);
+
+        mockExecutorRealSubmit(WorkerConnector.class);
+
+        // Use doReturn().when() syntax due to when().thenReturn() not being 
able to return wildcard generic types
+        
doReturn(SampleSourceConnector.SampleSourceTask.class).when(sourceConnector).taskClass();
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, allConnectorClientConfigOverridePolicy);
+        worker.start();
+
+        FutureCallback<TargetState> onFirstStart = new FutureCallback<>();
+        worker.startConnector(CONNECTOR_ID, connectorProps, ctx, 
connectorStatusListener, TargetState.STARTED, onFirstStart);
+        // Wait for the connector to actually start
+        assertEquals(TargetState.STARTED, onFirstStart.get(1000, 
TimeUnit.MILLISECONDS));
+
+        Map<String, String> taskConfig = new HashMap<>();
+
+        // No warnings or exceptions when a connector generates an empty list 
of task configs
+        when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList());
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Worker.class)) {
+            connectorProps.put(TASKS_MAX_CONFIG, "1");
+            List<Map<String, String>> taskConfigs = 
worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, 
connectorProps));
+            assertEquals(0, taskConfigs.size());
+            assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> 
e.getLevel().equals("WARN")));
+        }
+
+        // No warnings or exceptions when a connector generates the maximum 
permitted number of task configs
+        
when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList(taskConfig));
+        
when(sourceConnector.taskConfigs(2)).thenReturn(Arrays.asList(taskConfig, 
taskConfig));
+        
when(sourceConnector.taskConfigs(3)).thenReturn(Arrays.asList(taskConfig, 
taskConfig, taskConfig));
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Worker.class)) {
+            connectorProps.put(TASKS_MAX_CONFIG, "1");
+            List<Map<String, String>> taskConfigs = 
worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, 
connectorProps));
+            assertEquals(1, taskConfigs.size());
+
+            connectorProps.put(TASKS_MAX_CONFIG, "2");
+            taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new 
ConnectorConfig(plugins, connectorProps));
+            assertEquals(2, taskConfigs.size());
+
+            connectorProps.put(TASKS_MAX_CONFIG, "3");
+            taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new 
ConnectorConfig(plugins, connectorProps));
+            assertEquals(3, taskConfigs.size());
+
+            assertEquals(Collections.emptyList(), 
logCaptureAppender.getMessages("WARN"));
+            assertEquals(Collections.emptyList(), 
logCaptureAppender.getMessages("ERROR"));
+        }
+
+        // Warning/exception when a connector generates too many task configs
+        List<Map<String, String>> tooManyTaskConfigs = 
Arrays.asList(taskConfig, taskConfig, taskConfig, taskConfig);
+        when(sourceConnector.taskConfigs(1)).thenReturn(tooManyTaskConfigs);
+        when(sourceConnector.taskConfigs(2)).thenReturn(tooManyTaskConfigs);
+        when(sourceConnector.taskConfigs(3)).thenReturn(tooManyTaskConfigs);
+        for (int i = 0; i < 3; i++) {
+            try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Worker.class)) {
+                int tasksMax = i + 1;
+                connectorProps.put(TASKS_MAX_CONFIG, 
Integer.toString(tasksMax));
+                String tasksMaxExceededMessage;
+                if (enforced) {
+                    TooManyTasksException e = assertThrows(
+                            TooManyTasksException.class,
+                            () -> worker.connectorTaskConfigs(
+                                    CONNECTOR_ID,
+                                    new ConnectorConfig(plugins, 
connectorProps)
+                            )
+                    );
+                    tasksMaxExceededMessage = e.getMessage();
+                } else {
+                    List<Map<String, String>> taskConfigs = 
worker.connectorTaskConfigs(
+                            CONNECTOR_ID,
+                            new ConnectorConfig(plugins, connectorProps)
+                    );
+                    assertEquals(tooManyTaskConfigs.size(), 
taskConfigs.size());
+                    List<String> warningMessages = 
logCaptureAppender.getMessages("WARN");
+                    assertEquals(1, warningMessages.size());
+                    tasksMaxExceededMessage = warningMessages.get(0);
+                }
+                assertTasksMaxExceededMessage(
+                        CONNECTOR_ID,
+                        tooManyTaskConfigs.size(), tasksMax,
+                        tasksMaxExceededMessage
+                );
+
+                // Regardless of enforcement, there should never be any 
error-level log messages
+                assertEquals(Collections.emptyList(), 
logCaptureAppender.getMessages("ERROR"));
+            }
+        }
+
+        // One last sanity check in case the connector is reconfigured and 
respects tasks.max
+        
when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList(taskConfig));
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Worker.class)) {
+            connectorProps.put(TASKS_MAX_CONFIG, "1");
+            List<Map<String, String>> taskConfigs = 
worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, 
connectorProps));
+            assertEquals(1, taskConfigs.size());
+
+            assertEquals(Collections.emptyList(), 
logCaptureAppender.getMessages("WARN"));
+            assertEquals(Collections.emptyList(), 
logCaptureAppender.getMessages("ERROR"));
+        }
+
+        worker.stop();
+    }
+
+    @Test
+    public void testStartTaskWithTooManyTaskConfigsButMaxNotEnforced() {
+        testStartTaskWithTooManyTaskConfigs(false);
+    }
+
+    @Test
+    public void testStartTaskWithTooManyTaskConfigsAndMaxEnforced() {
+        testStartTaskWithTooManyTaskConfigs(true);
+    }
+
+    private void testStartTaskWithTooManyTaskConfigs(boolean enforced) {
+        SinkTask task = mock(TestSinkTask.class);
+        mockKafkaClusterId();
+
+        Map<String, String> origProps = 
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, 
TestSinkTask.class.getName());
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService,
+                noneConnectorClientConfigOverridePolicy, null);
+        worker.herder = herder;
+        worker.start();
+
+        assertStatistics(worker, 0, 0);
+        assertEquals(Collections.emptySet(), worker.taskIds());
+        Map<String, String> connectorConfigs = anyConnectorConfigMap();
+        connectorConfigs.put(TASKS_MAX_ENFORCE_CONFIG, 
Boolean.toString(enforced));
+        connectorConfigs.put(TOPICS_CONFIG, "t1");
+        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
SampleSinkConnector.class.getName());
+        // The connector is configured to generate at most one task config...
+        int maxTasks = 1;
+        connectorConfigs.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+
+        String connName = TASK_ID.connector();
+        int numTasks = 2;
+        ClusterConfigState configState = new ClusterConfigState(
+                0,
+                null,
+                // ... but it has generated two task configs
+                Collections.singletonMap(connName, numTasks),
+                Collections.singletonMap(connName, connectorConfigs),
+                Collections.singletonMap(connName, TargetState.STARTED),
+                Collections.singletonMap(TASK_ID, origProps),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet()
+        );
+
+        String tasksMaxExceededMessage;
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Worker.class)) {
+            if (enforced) {
+                assertFalse(worker.startSinkTask(
+                        TASK_ID,
+                        configState,
+                        connectorConfigs,
+                        origProps,
+                        taskStatusListener,
+                        TargetState.STARTED
+                ));
+
+                ArgumentCaptor<Throwable> failureCaptor = 
ArgumentCaptor.forClass(Throwable.class);
+                verify(taskStatusListener, times(1)).onFailure(eq(TASK_ID), 
failureCaptor.capture());
+                assertTrue(
+                        "Expected task start exception to be 
TooManyTasksException, but was "
+                                + failureCaptor.getValue().getClass() + " 
instead",
+                        failureCaptor.getValue() instanceof 
TooManyTasksException
+                );
+
+                tasksMaxExceededMessage = 
failureCaptor.getValue().getMessage();
+            } else {
+                mockTaskIsolation(SampleSinkConnector.class, 
TestSinkTask.class, task);
+                mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
+                mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
+                mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskHeaderConverter);
+                mockExecutorFakeSubmit(WorkerTask.class);
+
+                assertTrue(worker.startSinkTask(
+                        TASK_ID,
+                        configState,
+                        connectorConfigs,
+                        origProps,
+                        taskStatusListener,
+                        TargetState.STARTED
+                ));
+
+                List<String> warningMessages = 
logCaptureAppender.getMessages("WARN");
+                assertEquals(1, warningMessages.size());
+                tasksMaxExceededMessage = warningMessages.get(0);
+            }
+            assertTasksMaxExceededMessage(connName, numTasks, maxTasks, 
tasksMaxExceededMessage);
+        }
+    }
+
+    private void assertTasksMaxExceededMessage(String connector, int numTasks, 
int maxTasks, String message) {
+        String expectedPrefix = "The connector " + connector
+                + " has generated "
+                + numTasks + " tasks, which is greater than "
+                + maxTasks;
+        assertTrue(
+                "Warning/exception message '"
+                        + message + "' did not start with the expected prefix 
'"
+                        + expectedPrefix + "'",
+                message.startsWith(expectedPrefix)
+        );
+    }
+
     private void assertStatusMetrics(long expected, String metricName) {
         MetricGroup statusMetrics = 
worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector());
         if (expected == 0L) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 73911dcdcd2..6c5ce0fdb53 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.runtime.TooManyTasksException;
 import org.apache.kafka.connect.runtime.TopicStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -3267,6 +3268,47 @@ public class DistributedHerderTest {
         expectAndVerifyTaskReconfigurationRetries();
     }
 
+    @Test
+    public void testTaskReconfigurationNoRetryWithTooManyTasks() {
+        // initial tick
+        when(member.memberId()).thenReturn("leader");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        when(worker.isRunning(CONN1)).thenReturn(true);
+        when(worker.getPlugins()).thenReturn(plugins);
+
+        herder.tick();
+        // No requests are queued, so we shouldn't plan on waking up without 
external action
+        // (i.e., rebalance, user request, or shutdown)
+        // This helps indicate that no retriable operations (such as 
generating task configs after
+        // a backoff period) are queued up by the worker
+        verify(member, times(1)).poll(eq(Long.MAX_VALUE), any());
+
+        // Process the task reconfiguration request in this tick
+        int numTasks = MAX_TASKS + 5;
+        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+        // Fail to generate tasks because the connector provided too many task 
configs
+        when(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
+                .thenThrow(new TooManyTasksException(CONN1, numTasks, 
MAX_TASKS));
+
+        herder.requestTaskReconfiguration(CONN1);
+        herder.tick();
+        // We tried to generate task configs for the connector one time during 
this tick
+        verify(worker, times(1)).connectorTaskConfigs(CONN1, 
sinkConnectorConfig);
+        // Verifying again that no requests are queued
+        verify(member, times(2)).poll(eq(Long.MAX_VALUE), any());
+        verifyNoMoreInteractions(worker);
+
+        
time.sleep(DistributedHerder.RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS);
+        herder.tick();
+        // We ticked one more time, and no further attempt was made to 
generate task configs
+        verifyNoMoreInteractions(worker);
+        // And we don't have any requests queued
+        verify(member, times(3)).poll(eq(Long.MAX_VALUE), any());
+    }
+
     @Test
     public void 
testTaskReconfigurationRetriesWithLeaderRequestForwardingException() {
         herder = mock(DistributedHerder.class, 
withSettings().defaultAnswer(CALLS_REAL_METHODS).useConstructor(new 
DistributedConfig(HERDER_CONFIG),
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java
index e2daad66fca..515d0267460 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java
@@ -430,6 +430,29 @@ public class ConnectAssertions {
         );
     }
 
+    /**
+     * Assert that a connector is in FAILED state, that it has a specific 
number of tasks, and that all of
+     * its tasks are in the RUNNING state.
+     *
+     * @param connectorName the connector name
+     * @param numTasks the number of tasks
+     * @param detailMessage the assertion message
+     * @throws InterruptedException
+     */
+    public void assertConnectorIsFailedAndNumTasksAreRunning(String 
connectorName, int numTasks, String detailMessage)
+            throws InterruptedException {
+        waitForConnectorState(
+                connectorName,
+                AbstractStatus.State.FAILED,
+                exactly(numTasks),
+                null,
+                AbstractStatus.State.RUNNING,
+                "Either the connector is running or not all the " + numTasks + 
" tasks are running.",
+                detailMessage,
+                CONNECTOR_SETUP_DURATION_MS
+        );
+    }
+
     /**
      * Assert that a connector does not exist. This can be used to verify that 
a connector has been successfully deleted.
      *
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index af6c60e847e..0e58d633584 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -351,6 +351,23 @@ abstract class EmbeddedConnect {
         }
     }
 
+    /**
+     * Restart an existing task.
+     *
+     * @param connName name of the connector
+     * @param taskNum ID of the task (starting from 0)
+     * @throws ConnectRestException if the REST API returns error status
+     * @throws ConnectException for any other error.
+     */
+    public void restartTask(String connName, int taskNum) {
+        String url = 
endpointForResource(String.format("connectors/%s/tasks/%d/restart", connName, 
taskNum));
+        Response response = requestPost(url, "", Collections.emptyMap());
+        if (response.getStatus() >= 
Response.Status.BAD_REQUEST.getStatusCode()) {
+            throw new ConnectRestException(response.getStatus(),
+                    "Could not execute POST request. Error response: " + 
responseToString(response));
+        }
+    }
+
     /**
      * Restart an existing connector and its tasks.
      *

Reply via email to