C0urante commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1450879034


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -884,6 +890,158 @@ private void assertTimeoutException(Runnable operation, 
String expectedStageDesc
         connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
     }
 
+    /**
+     * Tests the logic around enforcement of the
+     * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
+     * property and how it can be toggled via the
+     * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG 
tasks.max.enforce}
+     * property, following the test plain laid out in
+     * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan";>KIP-1004</a>.
+     */
+    @Test
+    public void testTasksMaxEnforcement() throws Exception {
+        String configTopic = "tasks-max-enforcement-configs";
+        workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(
+                NUM_WORKERS,
+                "Initial group of workers did not start in time."
+        );
+
+        Map<String, String> connectorProps = 
defaultSourceConnectorProps(TOPIC_NAME);
+        int maxTasks = 1;
+        connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+        int numTasks = 2;
+        connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        // A connector that generates excessive tasks will be failed with an 
expected error message
+        connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+                CONNECTOR_NAME,
+                0,
+                "connector did not fail in time"
+        );
+
+        String expectedErrorSnippet = String.format(
+                "The connector %s has generated %d tasks, which is greater 
than %d, "
+                        + "the maximum number of tasks it is configured to 
create. ",
+                CONNECTOR_NAME,
+                numTasks,
+                maxTasks
+        );
+        String errorMessage = 
connect.connectorStatus(CONNECTOR_NAME).connector().trace();
+        assertThat(errorMessage, containsString(expectedErrorSnippet));
+
+        // Stop all workers in the cluster
+        connect.workers().forEach(connect::removeWorker);
+
+        // Publish a set of too many task configs to the config topic, to 
simulate
+        // an existing set of task configs that was written before the cluster 
was upgraded
+        try (JsonConverter converter = new JsonConverter()) {
+            converter.configure(
+                    
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"),
+                    false
+            );
+
+            for (int i = 0; i < numTasks; i++) {
+                Map<String, String> taskConfig = 
MonitorableSourceConnector.taskConfig(
+                        connectorProps,
+                        CONNECTOR_NAME,
+                        i
+                );
+                Struct wrappedTaskConfig = new 
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0)
+                        .put("properties", taskConfig);
+                String key = KafkaConfigBackingStore.TASK_KEY(new 
ConnectorTaskId(CONNECTOR_NAME, i));
+                byte[] value = converter.fromConnectData(
+                        configTopic,
+                        KafkaConfigBackingStore.TASK_CONFIGURATION_V0,
+                        wrappedTaskConfig
+                );
+                connect.kafka().produce(configTopic, key, new String(value));
+            }
+
+            Struct taskCommitMessage = new 
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0);
+            taskCommitMessage.put("tasks", numTasks);
+            String key = 
KafkaConfigBackingStore.COMMIT_TASKS_KEY(CONNECTOR_NAME);
+            byte[] value = converter.fromConnectData(
+                    configTopic,
+                    KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0,
+                    taskCommitMessage
+            );
+            connect.kafka().produce(configTopic, key, new String(value));
+        }
+
+        // Restart all the workers in the cluster
+        for (int i = 0; i < NUM_WORKERS; i++)
+            connect.addWorker();
+
+        // An existing set of tasks that exceeds the tasks.max property
+        // will be failed with an expected error message
+        connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+                CONNECTOR_NAME,
+                numTasks,
+                "connector and tasks did not fail in time"
+        );
+
+        connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, "false");
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        // That same existing set of tasks will be allowed to run
+        // once the connector is reconfigured with tasks.max.enforce set to 
false
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "connector and tasks did not start in time"
+        );
+
+        numTasks++;
+        connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        // A connector will be allowed to generate excessive tasks when 
tasks.max.enforce is set to false
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "connector and tasks did not start in time"
+        );
+
+        numTasks = maxTasks;
+        connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+        connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, "true");
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "connector and tasks did not start in time"
+        );
+
+        numTasks = maxTasks + 1;
+        connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        // A connector that generates excessive tasks after being reconfigured 
will be failed, but its existing tasks will continue running
+        connect.assertions().assertConnectorIsFailedAndNumTasksAreRunning(
+                CONNECTOR_NAME,
+                maxTasks,
+                "connector did not fail in time, or tasks were incorrectly 
failed"
+        );
+
+        // Make sure that the tasks have had a chance to fail (i.e., that the 
worker has been

Review Comment:
   never have i ever?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to