C0urante commented on code in PR #15180: URL: https://github.com/apache/kafka/pull/15180#discussion_r1450882720
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java: ########## @@ -619,7 +622,21 @@ public void testAddRemoveSourceTask() { assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds()); - worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); + + Map<String, String> connectorConfigs = anyConnectorConfigMap(); + ClusterConfigState configState = new ClusterConfigState( + 0, + null, + Collections.singletonMap(CONNECTOR_ID, 1), + Collections.singletonMap(CONNECTOR_ID, connectorConfigs), + Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED), + Collections.singletonMap(TASK_ID, origProps), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet() + ); + assertTrue(worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED)); Review Comment: Good catch, thanks 🙏 ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java: ########## @@ -718,7 +747,22 @@ public void testAddRemoveExactlyOnceSourceTask() { assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds()); - worker.startExactlyOnceSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED, preProducer, postProducer); + + Map<String, String> connectorConfigs = anyConnectorConfigMap(); + ClusterConfigState configState = new ClusterConfigState( + 0, + null, + Collections.singletonMap(CONNECTOR_ID, 1), + Collections.singletonMap(CONNECTOR_ID, connectorConfigs), + Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED), + Collections.singletonMap(TASK_ID, origProps), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet() + ); + + assertTrue(worker.startExactlyOnceSourceTask(TASK_ID, ClusterConfigState.EMPTY, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED, preProducer, postProducer)); Review Comment: Same as above ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -884,6 +890,158 @@ private void assertTimeoutException(Runnable operation, String expectedStageDesc connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS); } + /** + * Tests the logic around enforcement of the + * {@link org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max} + * property and how it can be toggled via the + * {@link org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG tasks.max.enforce} + * property, following the test plain laid out in + * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan">KIP-1004</a>. + */ + @Test + public void testTasksMaxEnforcement() throws Exception { + String configTopic = "tasks-max-enforcement-configs"; + workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp( + NUM_WORKERS, + "Initial group of workers did not start in time." + ); + + Map<String, String> connectorProps = defaultSourceConnectorProps(TOPIC_NAME); + int maxTasks = 1; + connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks)); + int numTasks = 2; + connectorProps.put(MonitorableSourceConnector.NUM_TASKS, Integer.toString(numTasks)); + connect.configureConnector(CONNECTOR_NAME, connectorProps); + + // A connector that generates excessive tasks will be failed with an expected error message + connect.assertions().assertConnectorIsFailedAndTasksHaveFailed( + CONNECTOR_NAME, + 0, + "connector did not fail in time" + ); + + String expectedErrorSnippet = String.format( + "The connector %s has generated %d tasks, which is greater than %d, " + + "the maximum number of tasks it is configured to create. ", + CONNECTOR_NAME, + numTasks, + maxTasks + ); + String errorMessage = connect.connectorStatus(CONNECTOR_NAME).connector().trace(); + assertThat(errorMessage, containsString(expectedErrorSnippet)); + + // Stop all workers in the cluster + connect.workers().forEach(connect::removeWorker); + + // Publish a set of too many task configs to the config topic, to simulate Review Comment: I considered that approach, but the constructor dependencies for the `KafkaConfigBackingStore` (`WorkerConfigTransformer` (which itself requires a `Worker` instance) and `DistributedConfig` especially) were a bit too cumbersome to instantiate. We could refactor the constructor to provide something a bit friendlier but IMO it's not worth it since the existing manual code is only around 30 lines long. If you think it'd be worth it I can give it another shot, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org