gharris1727 commented on code in PR #15180: URL: https://github.com/apache/kafka/pull/15180#discussion_r1449550586
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -620,6 +650,11 @@ private boolean startTask( try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); + + int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG); Review Comment: nit: make a `tasksMax` getter? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -391,7 +391,15 @@ public List<Map<String, String>> connectorTaskConfigs(String connName, Connector Connector connector = workerConnector.connector(); Review Comment: I didn't realize that this was the only place where Connector escapes the WorkerConnector, and a different thread interacts with the Connector object. I know the taskConfigs method is typically an instantaneous method, but maybe it would make sense for this to eventually move to the WorkerConnector thread instead of the herder tick thread. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ########## @@ -131,9 +133,26 @@ public void run() { } } + /** + * Fail the connector. + * @param cause the cause of the failure; if null, the connector will not be failed + */ + public void fail(Throwable cause) { + synchronized (this) { + if (this.externalFailure != null) + return; + this.externalFailure = cause; + notify(); + } + } + void doRun() { initialize(); while (!stopping) { + Throwable failure = externalFailure; + if (failure != null) + onFailure(failure); Review Comment: Is this in danger of being called more than once, particularly if the connector has this problem and then a pause/resume request comes in? Is that a bad thing? It looks like the connector thread just waits in this loop until something external calls shutdown(), so I would expect this to get called whenever someone notify()'s the worker connector thread. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ########## @@ -131,9 +133,26 @@ public void run() { } } + /** + * Fail the connector. + * @param cause the cause of the failure; if null, the connector will not be failed + */ + public void fail(Throwable cause) { + synchronized (this) { + if (this.externalFailure != null) + return; + this.externalFailure = cause; + notify(); + } + } + void doRun() { initialize(); while (!stopping) { + Throwable failure = externalFailure; + if (failure != null) Review Comment: nit: curly braces ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -391,7 +391,15 @@ public List<Map<String, String>> connectorTaskConfigs(String connName, Connector Connector connector = workerConnector.connector(); try (LoaderSwap loaderSwap = plugins.withClassLoader(workerConnector.loader())) { String taskClassName = connector.taskClass().getName(); - for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) { + List<Map<String, String>> taskConfigs = connector.taskConfigs(maxTasks); + try { + checkTasksMax(connName, taskConfigs.size(), maxTasks, connConfig.enforceTasksMax()); + } catch (TooManyTasksException e) { + // TODO: This control flow is awkward. Push task config generation into WorkerConnector class? Review Comment: This makes sense to me; checkTasksMax could be public static, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org