This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 4f0a4059083 KAFKA-15575: Begin enforcing 'tasks.max' property for connectors (#15180) 4f0a4059083 is described below commit 4f0a40590833a141c78341ce95ffc782747c5ac8 Author: Chris Egerton <chr...@aiven.io> AuthorDate: Thu Feb 1 11:33:04 2024 -0500 KAFKA-15575: Begin enforcing 'tasks.max' property for connectors (#15180) Reviewers: Ashwin Pankaj <apan...@confluent.io>, Greg Harris <greg.har...@aiven.io> --- checkstyle/suppressions.xml | 2 +- .../kafka/common/utils/LogCaptureAppender.java | 8 + .../kafka/connect/runtime/ConnectorConfig.java | 19 ++ .../connect/runtime/TooManyTasksException.java | 43 ++++ .../org/apache/kafka/connect/runtime/Worker.java | 79 ++++-- .../kafka/connect/runtime/WorkerConnector.java | 31 ++- .../runtime/distributed/DistributedHerder.java | 6 +- .../connect/integration/BlockingConnectorTest.java | 24 +- .../integration/ConnectWorkerIntegrationTest.java | 159 ++++++++++++ .../integration/MonitorableSourceConnector.java | 20 +- .../integration/OffsetsApiIntegrationTest.java | 4 + .../kafka/connect/runtime/AbstractHerderTest.java | 12 +- .../apache/kafka/connect/runtime/WorkerTest.java | 274 ++++++++++++++++++++- .../runtime/distributed/DistributedHerderTest.java | 42 ++++ .../connect/util/clusters/ConnectAssertions.java | 23 ++ .../connect/util/clusters/EmbeddedConnect.java | 17 ++ 16 files changed, 721 insertions(+), 42 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 9b63e009693..925c50dec86 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -151,7 +151,7 @@ files="LoggingResource.java" /> <suppress checks="ClassDataAbstractionCoupling" - files="(RestServer|AbstractHerder|DistributedHerder|Worker).java"/> + files="(RestServer|AbstractHerder|DistributedHerder|Worker(Test)?).java"/> <suppress checks="BooleanExpressionComplexity" files="JsonConverter.java"/> diff --git a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java index 4f035840bd2..eb592c11863 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java @@ -24,6 +24,7 @@ import org.apache.log4j.spi.LoggingEvent; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseable { private final List<LoggingEvent> events = new LinkedList<>(); @@ -100,6 +101,13 @@ public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseabl } } + public List<String> getMessages(String level) { + return getEvents().stream() + .filter(e -> level.equals(e.getLevel())) + .map(Event::getMessage) + .collect(Collectors.toList()); + } + public List<String> getMessages() { final LinkedList<String> result = new LinkedList<>(); synchronized (events) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index f33f00e40ef..d5cdc23fa36 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -115,6 +115,16 @@ public class ConnectorConfig extends AbstractConfig { private static final String TASK_MAX_DISPLAY = "Tasks max"; + public static final String TASKS_MAX_ENFORCE_CONFIG = "tasks.max.enforce"; + private static final String TASKS_MAX_ENFORCE_DOC = + "(Deprecated) Whether to enforce that the tasks.max property is respected by the connector. " + + "By default, connectors that generate too many tasks will fail, and existing sets of tasks that exceed the tasks.max property will also be failed. " + + "If this property is set to false, then connectors will be allowed to generate more than the maximum number of tasks, and existing sets of " + + "tasks that exceed the tasks.max property will be allowed to run. " + + "This property is deprecated and will be removed in an upcoming major release."; + public static final boolean TASKS_MAX_ENFORCE_DEFAULT = true; + private static final String TASKS_MAX_ENFORCE_DISPLAY = "Enforce tasks max"; + public static final String TRANSFORMS_CONFIG = "transforms"; private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to records."; private static final String TRANSFORMS_DISPLAY = "Transforms"; @@ -195,6 +205,7 @@ public class ConnectorConfig extends AbstractConfig { .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY) .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY) .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY) + .define(TASKS_MAX_ENFORCE_CONFIG, Type.BOOLEAN, TASKS_MAX_ENFORCE_DEFAULT, Importance.LOW, TASKS_MAX_ENFORCE_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASKS_MAX_ENFORCE_DISPLAY) .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY) .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY) .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY) @@ -281,6 +292,14 @@ public class ConnectorConfig extends AbstractConfig { return getBoolean(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG); } + public int tasksMax() { + return getInt(TASKS_MAX_CONFIG); + } + + public boolean enforceTasksMax() { + return getBoolean(TASKS_MAX_ENFORCE_CONFIG); + } + /** * Returns the initialized list of {@link TransformationStage} which apply the * {@link Transformation transformations} and {@link Predicate predicates} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TooManyTasksException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TooManyTasksException.java new file mode 100644 index 00000000000..62a2ea5a733 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TooManyTasksException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.errors.ConnectException; + +/** + * Thrown when a connector has generated too many task configs (i.e., more tasks than + * the value for {@link ConnectorConfig#TASKS_MAX_CONFIG tasks.max} that it has + * been configured with). + */ +public class TooManyTasksException extends ConnectException { + + public TooManyTasksException(String connName, int numTasks, int maxTasks) { + super(String.format( + "The connector %s has generated %d tasks, which is greater than %d, " + + "the maximum number of tasks it is configured to create. " + + "This behaviour should be considered a bug and is disallowed. " + + "If necessary, it can be permitted by reconfiguring the connector " + + "with '%s' set to false; however, this option will be removed in a " + + "future release of Kafka Connect.", + connName, + numTasks, + maxTasks, + ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG + )); + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index abdf064d0bb..81ab8617247 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -389,13 +389,21 @@ public class Worker { if (workerConnector == null) throw new ConnectException("Connector " + connName + " not found in this worker."); - int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG); + int maxTasks = connConfig.tasksMax(); Map<String, String> connOriginals = connConfig.originalsStrings(); Connector connector = workerConnector.connector(); try (LoaderSwap loaderSwap = plugins.withClassLoader(workerConnector.loader())) { String taskClassName = connector.taskClass().getName(); - for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) { + List<Map<String, String>> taskConfigs = connector.taskConfigs(maxTasks); + try { + checkTasksMax(connName, taskConfigs.size(), maxTasks, connConfig.enforceTasksMax()); + } catch (TooManyTasksException e) { + // TODO: This control flow is awkward. Push task config generation into WorkerConnector class? + workerConnector.fail(e); + throw e; + } + for (Map<String, String> taskProps : taskConfigs) { // Ensure we don't modify the connector's copy of the config Map<String, String> taskConfig = new HashMap<>(taskProps); taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); @@ -413,6 +421,26 @@ public class Worker { return result; } + private void checkTasksMax(String connName, int numTasks, int maxTasks, boolean enforce) { + if (numTasks > maxTasks) { + if (enforce) { + throw new TooManyTasksException(connName, numTasks, maxTasks); + } else { + log.warn( + "The connector {} has generated {} tasks, which is greater than {}, " + + "the maximum number of tasks it is configured to create. " + + "This behavior should be considered a bug and will be disallowed " + + "in future releases of Kafka Connect. Please report this to the " + + "maintainers of the connector and request that they adjust their " + + "connector's taskConfigs() method to respect the maxTasks parameter.", + connName, + numTasks, + maxTasks + ); + } + } + } + /** * Stop a connector managed by this worker. * @@ -522,12 +550,12 @@ public class Worker { /** * Start a sink task managed by this worker. * - * @param id the task ID. - * @param configState the most recent {@link ClusterConfigState} known to the worker - * @param connProps the connector properties. - * @param taskProps the tasks properties. + * @param id the task ID. + * @param configState the most recent {@link ClusterConfigState} known to the worker + * @param connProps the connector properties. + * @param taskProps the tasks properties. * @param statusListener a listener for the runtime status transitions of the task. - * @param initialState the initial state of the connector. + * @param initialState the initial state of the connector. * @return true if the task started successfully. */ public boolean startSinkTask( @@ -538,19 +566,19 @@ public class Worker { TaskStatus.Listener statusListener, TargetState initialState ) { - return startTask(id, connProps, taskProps, statusListener, + return startTask(id, connProps, taskProps, configState, statusListener, new SinkTaskBuilder(id, configState, statusListener, initialState)); } /** * Start a source task managed by this worker using older behavior that does not provide exactly-once support. * - * @param id the task ID. - * @param configState the most recent {@link ClusterConfigState} known to the worker - * @param connProps the connector properties. - * @param taskProps the tasks properties. + * @param id the task ID. + * @param configState the most recent {@link ClusterConfigState} known to the worker + * @param connProps the connector properties. + * @param taskProps the tasks properties. * @param statusListener a listener for the runtime status transitions of the task. - * @param initialState the initial state of the connector. + * @param initialState the initial state of the connector. * @return true if the task started successfully. */ public boolean startSourceTask( @@ -561,20 +589,20 @@ public class Worker { TaskStatus.Listener statusListener, TargetState initialState ) { - return startTask(id, connProps, taskProps, statusListener, + return startTask(id, connProps, taskProps, configState, statusListener, new SourceTaskBuilder(id, configState, statusListener, initialState)); } /** * Start a source task with exactly-once support managed by this worker. * - * @param id the task ID. - * @param configState the most recent {@link ClusterConfigState} known to the worker - * @param connProps the connector properties. - * @param taskProps the tasks properties. - * @param statusListener a listener for the runtime status transitions of the task. - * @param initialState the initial state of the connector. - * @param preProducerCheck a preflight check that should be performed before the task initializes its transactional producer. + * @param id the task ID. + * @param configState the most recent {@link ClusterConfigState} known to the worker + * @param connProps the connector properties. + * @param taskProps the tasks properties. + * @param statusListener a listener for the runtime status transitions of the task. + * @param initialState the initial state of the connector. + * @param preProducerCheck a preflight check that should be performed before the task initializes its transactional producer. * @param postProducerCheck a preflight check that should be performed after the task initializes its transactional producer, * but before producing any source records or offsets. * @return true if the task started successfully. @@ -589,7 +617,7 @@ public class Worker { Runnable preProducerCheck, Runnable postProducerCheck ) { - return startTask(id, connProps, taskProps, statusListener, + return startTask(id, connProps, taskProps, configState, statusListener, new ExactlyOnceSourceTaskBuilder(id, configState, statusListener, initialState, preProducerCheck, postProducerCheck)); } @@ -599,6 +627,7 @@ public class Worker { * @param id the task ID. * @param connProps the connector properties. * @param taskProps the tasks properties. + * @param configState the most recent {@link ClusterConfigState} known to the worker * @param statusListener a listener for the runtime status transitions of the task. * @param taskBuilder the {@link TaskBuilder} used to create the {@link WorkerTask} that manages the lifecycle of the task. * @return true if the task started successfully. @@ -607,6 +636,7 @@ public class Worker { ConnectorTaskId id, Map<String, String> connProps, Map<String, String> taskProps, + ClusterConfigState configState, TaskStatus.Listener statusListener, TaskBuilder<?, ?> taskBuilder ) { @@ -624,6 +654,11 @@ public class Worker { try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); + + int maxTasks = connConfig.tasksMax(); + int numTasks = configState.taskCount(id.connector()); + checkTasksMax(id.connector(), numTasks, maxTasks, connConfig.enforceTasksMax()); + final TaskConfig taskConfig = new TaskConfig(taskProps); final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); final Task task = plugins.newTask(taskClass); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index f2a289f92c7..4bbe1f42b8b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -73,6 +73,7 @@ public class WorkerConnector implements Runnable { private final AtomicReference<TargetState> pendingTargetStateChange; private final AtomicReference<Callback<TargetState>> pendingStateChangeCallback; private final CountDownLatch shutdownLatch; + private volatile Throwable externalFailure; private volatile boolean stopping; // indicates whether the Worker has asked the connector to stop private volatile boolean cancelled; // indicates whether the Worker has cancelled the connector (e.g. because of slow shutdown) @@ -102,6 +103,7 @@ public class WorkerConnector implements Runnable { this.pendingTargetStateChange = new AtomicReference<>(); this.pendingStateChangeCallback = new AtomicReference<>(); this.shutdownLatch = new CountDownLatch(1); + this.externalFailure = null; this.stopping = false; this.cancelled = false; } @@ -131,9 +133,27 @@ public class WorkerConnector implements Runnable { } } + /** + * Fail the connector. + * @param cause the cause of the failure; if null, the connector will not be failed + */ + public void fail(Throwable cause) { + synchronized (this) { + if (this.externalFailure != null) + return; + log.error("{} Connector has failed", this, cause); + this.externalFailure = cause; + notify(); + } + } + void doRun() { initialize(); while (!stopping) { + Throwable failure = externalFailure; + if (failure != null) + onFailure(failure); + TargetState newTargetState; Callback<TargetState> stateChangeCallback; synchronized (this) { @@ -144,7 +164,10 @@ public class WorkerConnector implements Runnable { doTransitionTo(newTargetState, stateChangeCallback); } synchronized (this) { - if (pendingTargetStateChange.get() != null || stopping) { + if (pendingTargetStateChange.get() != null + || (!State.FAILED.equals(state) && externalFailure != null) + || stopping + ) { // An update occurred before we entered the synchronized block; no big deal, // just start the loop again until we've handled everything } else { @@ -203,7 +226,11 @@ public class WorkerConnector implements Runnable { } } - private void onFailure(Throwable t) { + private synchronized void onFailure(Throwable t) { + // If we've already failed, we don't overwrite the last-reported cause of failure + if (this.state == State.FAILED) + return; + statusListener.onFailure(connName, t); this.state = State.FAILED; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 324f4cd5de7..c4e5bb15584 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -50,6 +50,7 @@ import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskStatus; +import org.apache.kafka.connect.runtime.TooManyTasksException; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; import org.apache.kafka.connect.runtime.rest.RestClient; @@ -154,7 +155,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1); private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS = 250; - private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS = 60000; + static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS = 60000; private static final long CONFIG_TOPIC_WRITE_PRIVILEGES_BACKOFF_MS = 250; private static final int START_STOP_THREAD_POOL_SIZE = 8; private static final short BACKOFF_RETRIES = 5; @@ -2139,6 +2140,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (error != null) { if (isPossibleExpiredKeyException(initialRequestTime, error)) { log.debug("Failed to reconfigure connector's tasks ({}), possibly due to expired session key. Retrying after backoff", connName); + } else if (error instanceof TooManyTasksException) { + log.debug("Connector {} generated too many tasks; will not retry configuring connector", connName); + return; } else { log.error("Failed to reconfigure connector's tasks ({}), retrying after backoff.", connName, error); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 8465a1e31e3..d39632a48e5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -62,6 +62,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; @@ -146,6 +147,7 @@ public class BlockingConnectorTest { connect.stop(); // unblock everything so that we don't leak threads after each test run Block.reset(); + Block.join(); } @Test @@ -442,11 +444,29 @@ public class BlockingConnectorTest { resetAwaitBlockLatch(); BLOCK_LATCHES.forEach(CountDownLatch::countDown); BLOCK_LATCHES.clear(); + } + + /** + * {@link Thread#join(long millis) Await} the termination of all threads that have been + * intentionally blocked either since the last invocation of this method or, if this method + * has never been invoked, all threads that have ever been blocked. + */ + public static synchronized void join() { BLOCKED_THREADS.forEach(t -> { try { t.join(30_000); if (t.isAlive()) { - log.warn("Thread {} failed to finish in time", t); + log.warn( + "Thread {} failed to finish in time; current stack trace:\n{}", + t, + Stream.of(t.getStackTrace()) + .map(s -> String.format( + "\t%s.%s:%d", + s.getClassName(), + s.getMethodName(), + s.getLineNumber() + )).collect(Collectors.joining("\n")) + ); } } catch (InterruptedException e) { throw new RuntimeException("Interrupted while waiting for blocked thread " + t + " to finish"); @@ -777,7 +797,7 @@ public class BlockingConnectorTest { @Override public List<Map<String, String>> taskConfigs(int maxTasks) { - return IntStream.rangeClosed(0, maxTasks) + return IntStream.range(0, maxTasks) .mapToObj(i -> new HashMap<>(props)) .collect(Collectors.toList()); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 9dca7425d66..d651ef87090 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -18,12 +18,17 @@ package org.apache.kafka.connect.integration; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.apache.kafka.connect.storage.KafkaConfigBackingStore; import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.WorkerHandle; import org.apache.kafka.test.IntegrationTest; @@ -54,6 +59,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; @@ -881,6 +887,159 @@ public class ConnectWorkerIntegrationTest { connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS); } + /** + * Tests the logic around enforcement of the + * {@link org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max} + * property and how it can be toggled via the + * {@link org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG tasks.max.enforce} + * property, following the test plain laid out in + * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan">KIP-1004</a>. + */ + @Test + public void testTasksMaxEnforcement() throws Exception { + String configTopic = "tasks-max-enforcement-configs"; + workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp( + NUM_WORKERS, + "Initial group of workers did not start in time." + ); + + Map<String, String> connectorProps = defaultSourceConnectorProps(TOPIC_NAME); + int maxTasks = 1; + connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks)); + int numTasks = 2; + connectorProps.put(MonitorableSourceConnector.NUM_TASKS, Integer.toString(numTasks)); + connect.configureConnector(CONNECTOR_NAME, connectorProps); + + // A connector that generates excessive tasks will be failed with an expected error message + connect.assertions().assertConnectorIsFailedAndTasksHaveFailed( + CONNECTOR_NAME, + 0, + "connector did not fail in time" + ); + + String expectedErrorSnippet = String.format( + "The connector %s has generated %d tasks, which is greater than %d, " + + "the maximum number of tasks it is configured to create. ", + CONNECTOR_NAME, + numTasks, + maxTasks + ); + String errorMessage = connect.connectorStatus(CONNECTOR_NAME).connector().trace(); + assertThat(errorMessage, containsString(expectedErrorSnippet)); + + // Stop all workers in the cluster + connect.workers().forEach(connect::removeWorker); + + // Publish a set of too many task configs to the config topic, to simulate + // an existing set of task configs that was written before the cluster was upgraded + try (JsonConverter converter = new JsonConverter()) { + converter.configure( + Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), + false + ); + + for (int i = 0; i < numTasks; i++) { + Map<String, String> taskConfig = MonitorableSourceConnector.taskConfig( + connectorProps, + CONNECTOR_NAME, + i + ); + Struct wrappedTaskConfig = new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0) + .put("properties", taskConfig); + String key = KafkaConfigBackingStore.TASK_KEY(new ConnectorTaskId(CONNECTOR_NAME, i)); + byte[] value = converter.fromConnectData( + configTopic, + KafkaConfigBackingStore.TASK_CONFIGURATION_V0, + wrappedTaskConfig + ); + connect.kafka().produce(configTopic, key, new String(value)); + } + + Struct taskCommitMessage = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0); + taskCommitMessage.put("tasks", numTasks); + String key = KafkaConfigBackingStore.COMMIT_TASKS_KEY(CONNECTOR_NAME); + byte[] value = converter.fromConnectData( + configTopic, + KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, + taskCommitMessage + ); + connect.kafka().produce(configTopic, key, new String(value)); + } + + // Restart all the workers in the cluster + for (int i = 0; i < NUM_WORKERS; i++) + connect.addWorker(); + + // An existing set of tasks that exceeds the tasks.max property + // will be failed with an expected error message + connect.assertions().assertConnectorIsFailedAndTasksHaveFailed( + CONNECTOR_NAME, + numTasks, + "connector and tasks did not fail in time" + ); + + connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, "false"); + connect.configureConnector(CONNECTOR_NAME, connectorProps); + + // That same existing set of tasks will be allowed to run + // once the connector is reconfigured with tasks.max.enforce set to false + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "connector and tasks did not start in time" + ); + + numTasks++; + connectorProps.put(MonitorableSourceConnector.NUM_TASKS, Integer.toString(numTasks)); + connect.configureConnector(CONNECTOR_NAME, connectorProps); + + // A connector will be allowed to generate excessive tasks when tasks.max.enforce is set to false + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "connector and tasks did not start in time" + ); + + numTasks = maxTasks; + connectorProps.put(MonitorableSourceConnector.NUM_TASKS, Integer.toString(numTasks)); + connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, "true"); + connect.configureConnector(CONNECTOR_NAME, connectorProps); + + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "connector and tasks did not start in time" + ); + + numTasks = maxTasks + 1; + connectorProps.put(MonitorableSourceConnector.NUM_TASKS, Integer.toString(numTasks)); + connect.configureConnector(CONNECTOR_NAME, connectorProps); + + // A connector that generates excessive tasks after being reconfigured will be failed, but its existing tasks will continue running + connect.assertions().assertConnectorIsFailedAndNumTasksAreRunning( + CONNECTOR_NAME, + maxTasks, + "connector did not fail in time, or tasks were incorrectly failed" + ); + + // Make sure that the tasks have had a chance to fail (i.e., that the worker has been given + // a chance to check on the number of tasks for the connector during task startup) + for (int i = 0; i < maxTasks; i++) + connect.restartTask(CONNECTOR_NAME, i); + + // Verify one more time that none of the tasks have actually failed + connect.assertions().assertConnectorIsFailedAndNumTasksAreRunning( + CONNECTOR_NAME, + maxTasks, + "connector did not fail in time, or tasks were incorrectly failed" + ); + } + private Map<String, String> defaultSourceConnectorProps(String topic) { // setup props for the source connector Map<String, String> props = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java index 1645aa0ecc2..049c75727a0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java @@ -51,6 +51,7 @@ public class MonitorableSourceConnector extends SampleSourceConnector { private static final Logger log = LoggerFactory.getLogger(MonitorableSourceConnector.class); public static final String TOPIC_CONFIG = "topic"; + public static final String NUM_TASKS = "num.tasks"; public static final String MESSAGES_PER_POLL_CONFIG = "messages.per.poll"; public static final String MAX_MESSAGES_PER_SECOND_CONFIG = "throughput"; public static final String MAX_MESSAGES_PRODUCED_CONFIG = "max.messages"; @@ -93,16 +94,27 @@ public class MonitorableSourceConnector extends SampleSourceConnector { @Override public List<Map<String, String>> taskConfigs(int maxTasks) { + String numTasksProp = commonConfigs.get(NUM_TASKS); + int numTasks = numTasksProp != null ? Integer.parseInt(numTasksProp) : maxTasks; List<Map<String, String>> configs = new ArrayList<>(); - for (int i = 0; i < maxTasks; i++) { - Map<String, String> config = new HashMap<>(commonConfigs); - config.put("connector.name", connectorName); - config.put("task.id", taskId(connectorName, i)); + for (int i = 0; i < numTasks; i++) { + Map<String, String> config = taskConfig(commonConfigs, connectorName, i); configs.add(config); } return configs; } + public static Map<String, String> taskConfig( + Map<String, String> connectorProps, + String connectorName, + int taskNum + ) { + Map<String, String> result = new HashMap<>(connectorProps); + result.put("connector.name", connectorName); + result.put("task.id", taskId(connectorName, taskNum)); + return result; + } + @Override public void stop() { log.info("Stopped {} connector {}", this.getClass().getSimpleName(), connectorName); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index f4c79ab44a3..b632267598f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -119,6 +119,8 @@ public class OffsetsApiIntegrationTest { public static void close() { // stop all Connect, Kafka and Zk threads. CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); + // wait for all blocked threads created while testing zombie task scenarios to finish + BlockingConnectorTest.Block.join(); } private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map<String, String> workerProps) { @@ -469,6 +471,7 @@ public class OffsetsApiIntegrationTest { () -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter))); assertThat(e.getMessage(), containsString("zombie sink task")); + // clean up blocked threads created while testing zombie task scenarios BlockingConnectorTest.Block.reset(); } @@ -810,6 +813,7 @@ public class OffsetsApiIntegrationTest { ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(connectorName)); assertThat(e.getMessage(), containsString("zombie sink task")); + // clean up blocked threads created while testing zombie task scenarios BlockingConnectorTest.Block.reset(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 7d7020604cc..7c39ce536be 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -470,8 +470,8 @@ public class AbstractHerderTest { assertEquals(2, result.errorCount()); Map<String, ConfigInfo> infos = result.values().stream() .collect(Collectors.toMap(info -> info.configKey().name(), Function.identity())); - // Base connector config has 14 fields, connector's configs add 7 - assertEquals(21, infos.size()); + // Base connector config has 15 fields, connector's configs add 7 + assertEquals(22, infos.size()); // Missing name should generate an error assertEquals(ConnectorConfig.NAME_CONFIG, infos.get(ConnectorConfig.NAME_CONFIG).configValue().name()); @@ -582,7 +582,7 @@ public class AbstractHerderTest { assertEquals(1, result.errorCount()); Map<String, ConfigInfo> infos = result.values().stream() .collect(Collectors.toMap(info -> info.configKey().name(), Function.identity())); - assertEquals(26, infos.size()); + assertEquals(27, infos.size()); // Should get 2 type fields from the transforms, first adds its own config since it has a valid class assertEquals("transforms.xformA.type", infos.get("transforms.xformA.type").configValue().name()); @@ -639,7 +639,7 @@ public class AbstractHerderTest { assertEquals(1, result.errorCount()); Map<String, ConfigInfo> infos = result.values().stream() .collect(Collectors.toMap(info -> info.configKey().name(), Function.identity())); - assertEquals(28, infos.size()); + assertEquals(29, infos.size()); // Should get 2 type fields from the transforms, first adds its own config since it has a valid class assertEquals("transforms.xformA.type", infos.get("transforms.xformA.type").configValue().name()); assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty()); @@ -700,8 +700,8 @@ public class AbstractHerderTest { ); assertEquals(expectedGroups, result.groups()); assertEquals(1, result.errorCount()); - // Base connector config has 14 fields, connector's configs add 7, and 2 producer overrides - assertEquals(23, result.values().size()); + // Base connector config has 15 fields, connector's configs add 7, and 2 producer overrides + assertEquals(24, result.values().size()); assertTrue(result.values().stream().anyMatch( configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty())); assertTrue(result.values().stream().anyMatch( diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index e424da0bc93..c48abb0130a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -130,6 +131,8 @@ import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_C import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; @@ -619,7 +622,21 @@ public class WorkerTest { assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds()); - worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); + + Map<String, String> connectorConfigs = anyConnectorConfigMap(); + ClusterConfigState configState = new ClusterConfigState( + 0, + null, + Collections.singletonMap(CONNECTOR_ID, 1), + Collections.singletonMap(CONNECTOR_ID, connectorConfigs), + Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED), + Collections.singletonMap(TASK_ID, origProps), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet() + ); + assertTrue(worker.startSourceTask(TASK_ID, configState, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED)); assertStatistics(worker, 0, 1); assertEquals(Collections.singleton(TASK_ID), worker.taskIds()); worker.stopAndAwaitTask(TASK_ID); @@ -662,7 +679,19 @@ public class WorkerTest { connectorConfigs.put(TOPICS_CONFIG, "t1"); connectorConfigs.put(CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName()); - worker.startSinkTask(TASK_ID, ClusterConfigState.EMPTY, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED); + ClusterConfigState configState = new ClusterConfigState( + 0, + null, + Collections.singletonMap(CONNECTOR_ID, 1), + Collections.singletonMap(CONNECTOR_ID, connectorConfigs), + Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED), + Collections.singletonMap(TASK_ID, origProps), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet() + ); + assertTrue(worker.startSinkTask(TASK_ID, configState, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED)); assertStatistics(worker, 0, 1); assertEquals(Collections.singleton(TASK_ID), worker.taskIds()); worker.stopAndAwaitTask(TASK_ID); @@ -718,7 +747,22 @@ public class WorkerTest { assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds()); - worker.startExactlyOnceSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED, preProducer, postProducer); + + Map<String, String> connectorConfigs = anyConnectorConfigMap(); + ClusterConfigState configState = new ClusterConfigState( + 0, + null, + Collections.singletonMap(CONNECTOR_ID, 1), + Collections.singletonMap(CONNECTOR_ID, connectorConfigs), + Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED), + Collections.singletonMap(TASK_ID, origProps), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet() + ); + + assertTrue(worker.startExactlyOnceSourceTask(TASK_ID, configState, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED, preProducer, postProducer)); assertStatistics(worker, 0, 1); assertEquals(Collections.singleton(TASK_ID), worker.taskIds()); worker.stopAndAwaitTask(TASK_ID); @@ -784,7 +828,7 @@ public class WorkerTest { ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, - taskStatusListener, + taskStatusListener, TargetState.STARTED); assertStatusMetrics(1L, "connector-running-task-count"); @@ -2522,6 +2566,228 @@ public class WorkerTest { verifyKafkaClusterId(); } + @Test + public void testConnectorGeneratesTooManyTasksButMaxNotEnforced() throws Exception { + testConnectorGeneratesTooManyTasks(false); + } + + @Test + public void testConnectorGeneratesTooManyTasksAndMaxEnforced() throws Exception { + testConnectorGeneratesTooManyTasks(true); + } + + private void testConnectorGeneratesTooManyTasks(boolean enforced) throws Exception { + mockKafkaClusterId(); + + String connectorClass = SampleSourceConnector.class.getName(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); + connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, Boolean.toString(enforced)); + mockConnectorIsolation(connectorClass, sourceConnector); + + mockExecutorRealSubmit(WorkerConnector.class); + + // Use doReturn().when() syntax due to when().thenReturn() not being able to return wildcard generic types + doReturn(SampleSourceConnector.SampleSourceTask.class).when(sourceConnector).taskClass(); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, allConnectorClientConfigOverridePolicy); + worker.start(); + + FutureCallback<TargetState> onFirstStart = new FutureCallback<>(); + worker.startConnector(CONNECTOR_ID, connectorProps, ctx, connectorStatusListener, TargetState.STARTED, onFirstStart); + // Wait for the connector to actually start + assertEquals(TargetState.STARTED, onFirstStart.get(1000, TimeUnit.MILLISECONDS)); + + Map<String, String> taskConfig = new HashMap<>(); + + // No warnings or exceptions when a connector generates an empty list of task configs + when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList()); + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { + connectorProps.put(TASKS_MAX_CONFIG, "1"); + List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps)); + assertEquals(0, taskConfigs.size()); + assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().equals("WARN"))); + } + + // No warnings or exceptions when a connector generates the maximum permitted number of task configs + when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList(taskConfig)); + when(sourceConnector.taskConfigs(2)).thenReturn(Arrays.asList(taskConfig, taskConfig)); + when(sourceConnector.taskConfigs(3)).thenReturn(Arrays.asList(taskConfig, taskConfig, taskConfig)); + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { + connectorProps.put(TASKS_MAX_CONFIG, "1"); + List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps)); + assertEquals(1, taskConfigs.size()); + + connectorProps.put(TASKS_MAX_CONFIG, "2"); + taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps)); + assertEquals(2, taskConfigs.size()); + + connectorProps.put(TASKS_MAX_CONFIG, "3"); + taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps)); + assertEquals(3, taskConfigs.size()); + + assertEquals(Collections.emptyList(), logCaptureAppender.getMessages("WARN")); + assertEquals(Collections.emptyList(), logCaptureAppender.getMessages("ERROR")); + } + + // Warning/exception when a connector generates too many task configs + List<Map<String, String>> tooManyTaskConfigs = Arrays.asList(taskConfig, taskConfig, taskConfig, taskConfig); + when(sourceConnector.taskConfigs(1)).thenReturn(tooManyTaskConfigs); + when(sourceConnector.taskConfigs(2)).thenReturn(tooManyTaskConfigs); + when(sourceConnector.taskConfigs(3)).thenReturn(tooManyTaskConfigs); + for (int i = 0; i < 3; i++) { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { + int tasksMax = i + 1; + connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(tasksMax)); + String tasksMaxExceededMessage; + if (enforced) { + TooManyTasksException e = assertThrows( + TooManyTasksException.class, + () -> worker.connectorTaskConfigs( + CONNECTOR_ID, + new ConnectorConfig(plugins, connectorProps) + ) + ); + tasksMaxExceededMessage = e.getMessage(); + } else { + List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs( + CONNECTOR_ID, + new ConnectorConfig(plugins, connectorProps) + ); + assertEquals(tooManyTaskConfigs.size(), taskConfigs.size()); + List<String> warningMessages = logCaptureAppender.getMessages("WARN"); + assertEquals(1, warningMessages.size()); + tasksMaxExceededMessage = warningMessages.get(0); + } + assertTasksMaxExceededMessage( + CONNECTOR_ID, + tooManyTaskConfigs.size(), tasksMax, + tasksMaxExceededMessage + ); + + // Regardless of enforcement, there should never be any error-level log messages + assertEquals(Collections.emptyList(), logCaptureAppender.getMessages("ERROR")); + } + } + + // One last sanity check in case the connector is reconfigured and respects tasks.max + when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList(taskConfig)); + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { + connectorProps.put(TASKS_MAX_CONFIG, "1"); + List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps)); + assertEquals(1, taskConfigs.size()); + + assertEquals(Collections.emptyList(), logCaptureAppender.getMessages("WARN")); + assertEquals(Collections.emptyList(), logCaptureAppender.getMessages("ERROR")); + } + + worker.stop(); + } + + @Test + public void testStartTaskWithTooManyTaskConfigsButMaxNotEnforced() { + testStartTaskWithTooManyTaskConfigs(false); + } + + @Test + public void testStartTaskWithTooManyTaskConfigsAndMaxEnforced() { + testStartTaskWithTooManyTaskConfigs(true); + } + + private void testStartTaskWithTooManyTaskConfigs(boolean enforced) { + SinkTask task = mock(TestSinkTask.class); + mockKafkaClusterId(); + + Map<String, String> origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName()); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, + noneConnectorClientConfigOverridePolicy, null); + worker.herder = herder; + worker.start(); + + assertStatistics(worker, 0, 0); + assertEquals(Collections.emptySet(), worker.taskIds()); + Map<String, String> connectorConfigs = anyConnectorConfigMap(); + connectorConfigs.put(TASKS_MAX_ENFORCE_CONFIG, Boolean.toString(enforced)); + connectorConfigs.put(TOPICS_CONFIG, "t1"); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName()); + // The connector is configured to generate at most one task config... + int maxTasks = 1; + connectorConfigs.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks)); + + String connName = TASK_ID.connector(); + int numTasks = 2; + ClusterConfigState configState = new ClusterConfigState( + 0, + null, + // ... but it has generated two task configs + Collections.singletonMap(connName, numTasks), + Collections.singletonMap(connName, connectorConfigs), + Collections.singletonMap(connName, TargetState.STARTED), + Collections.singletonMap(TASK_ID, origProps), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet() + ); + + String tasksMaxExceededMessage; + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { + if (enforced) { + assertFalse(worker.startSinkTask( + TASK_ID, + configState, + connectorConfigs, + origProps, + taskStatusListener, + TargetState.STARTED + )); + + ArgumentCaptor<Throwable> failureCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(taskStatusListener, times(1)).onFailure(eq(TASK_ID), failureCaptor.capture()); + assertTrue( + "Expected task start exception to be TooManyTasksException, but was " + + failureCaptor.getValue().getClass() + " instead", + failureCaptor.getValue() instanceof TooManyTasksException + ); + + tasksMaxExceededMessage = failureCaptor.getValue().getMessage(); + } else { + mockTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, task); + mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); + mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); + mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); + mockExecutorFakeSubmit(WorkerTask.class); + + assertTrue(worker.startSinkTask( + TASK_ID, + configState, + connectorConfigs, + origProps, + taskStatusListener, + TargetState.STARTED + )); + + List<String> warningMessages = logCaptureAppender.getMessages("WARN"); + assertEquals(1, warningMessages.size()); + tasksMaxExceededMessage = warningMessages.get(0); + } + assertTasksMaxExceededMessage(connName, numTasks, maxTasks, tasksMaxExceededMessage); + } + } + + private void assertTasksMaxExceededMessage(String connector, int numTasks, int maxTasks, String message) { + String expectedPrefix = "The connector " + connector + + " has generated " + + numTasks + " tasks, which is greater than " + + maxTasks; + assertTrue( + "Warning/exception message '" + + message + "' did not start with the expected prefix '" + + expectedPrefix + "'", + message.startsWith(expectedPrefix) + ); + } + private void assertStatusMetrics(long expected, String metricName) { MetricGroup statusMetrics = worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector()); if (expected == 0L) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 73911dcdcd2..6c5ce0fdb53 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.TaskStatus; +import org.apache.kafka.connect.runtime.TooManyTasksException; import org.apache.kafka.connect.runtime.TopicStatus; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -3267,6 +3268,47 @@ public class DistributedHerderTest { expectAndVerifyTaskReconfigurationRetries(); } + @Test + public void testTaskReconfigurationNoRetryWithTooManyTasks() { + // initial tick + when(member.memberId()).thenReturn("leader"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + + when(worker.isRunning(CONN1)).thenReturn(true); + when(worker.getPlugins()).thenReturn(plugins); + + herder.tick(); + // No requests are queued, so we shouldn't plan on waking up without external action + // (i.e., rebalance, user request, or shutdown) + // This helps indicate that no retriable operations (such as generating task configs after + // a backoff period) are queued up by the worker + verify(member, times(1)).poll(eq(Long.MAX_VALUE), any()); + + // Process the task reconfiguration request in this tick + int numTasks = MAX_TASKS + 5; + SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG); + // Fail to generate tasks because the connector provided too many task configs + when(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig)) + .thenThrow(new TooManyTasksException(CONN1, numTasks, MAX_TASKS)); + + herder.requestTaskReconfiguration(CONN1); + herder.tick(); + // We tried to generate task configs for the connector one time during this tick + verify(worker, times(1)).connectorTaskConfigs(CONN1, sinkConnectorConfig); + // Verifying again that no requests are queued + verify(member, times(2)).poll(eq(Long.MAX_VALUE), any()); + verifyNoMoreInteractions(worker); + + time.sleep(DistributedHerder.RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS); + herder.tick(); + // We ticked one more time, and no further attempt was made to generate task configs + verifyNoMoreInteractions(worker); + // And we don't have any requests queued + verify(member, times(3)).poll(eq(Long.MAX_VALUE), any()); + } + @Test public void testTaskReconfigurationRetriesWithLeaderRequestForwardingException() { herder = mock(DistributedHerder.class, withSettings().defaultAnswer(CALLS_REAL_METHODS).useConstructor(new DistributedConfig(HERDER_CONFIG), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java index e2daad66fca..515d0267460 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java @@ -430,6 +430,29 @@ public class ConnectAssertions { ); } + /** + * Assert that a connector is in FAILED state, that it has a specific number of tasks, and that all of + * its tasks are in the RUNNING state. + * + * @param connectorName the connector name + * @param numTasks the number of tasks + * @param detailMessage the assertion message + * @throws InterruptedException + */ + public void assertConnectorIsFailedAndNumTasksAreRunning(String connectorName, int numTasks, String detailMessage) + throws InterruptedException { + waitForConnectorState( + connectorName, + AbstractStatus.State.FAILED, + exactly(numTasks), + null, + AbstractStatus.State.RUNNING, + "Either the connector is running or not all the " + numTasks + " tasks are running.", + detailMessage, + CONNECTOR_SETUP_DURATION_MS + ); + } + /** * Assert that a connector does not exist. This can be used to verify that a connector has been successfully deleted. * diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java index af6c60e847e..0e58d633584 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java @@ -351,6 +351,23 @@ abstract class EmbeddedConnect { } } + /** + * Restart an existing task. + * + * @param connName name of the connector + * @param taskNum ID of the task (starting from 0) + * @throws ConnectRestException if the REST API returns error status + * @throws ConnectException for any other error. + */ + public void restartTask(String connName, int taskNum) { + String url = endpointForResource(String.format("connectors/%s/tasks/%d/restart", connName, taskNum)); + Response response = requestPost(url, "", Collections.emptyMap()); + if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { + throw new ConnectRestException(response.getStatus(), + "Could not execute POST request. Error response: " + responseToString(response)); + } + } + /** * Restart an existing connector and its tasks. *