gharris1727 commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1449550586


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -620,6 +650,11 @@ private boolean startTask(
 
             try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
                 final ConnectorConfig connConfig = new 
ConnectorConfig(plugins, connProps);
+
+                int maxTasks = 
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);

Review Comment:
   nit: make a `tasksMax` getter?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -391,7 +391,15 @@ public List<Map<String, String>> 
connectorTaskConfigs(String connName, Connector
             Connector connector = workerConnector.connector();

Review Comment:
   I didn't realize that this was the only place where Connector escapes the 
WorkerConnector, and a different thread interacts with the Connector object.
   
   I know the taskConfigs method is typically an instantaneous method, but 
maybe it would make sense for this to eventually move to the WorkerConnector 
thread instead of the herder tick thread.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -131,9 +133,26 @@ public void run() {
         }
     }
 
+    /**
+     * Fail the connector.
+     * @param cause the cause of the failure; if null, the connector will not 
be failed
+     */
+    public void fail(Throwable cause) {
+        synchronized (this) {
+            if (this.externalFailure != null)
+                return;
+            this.externalFailure = cause;
+            notify();
+        }
+    }
+
     void doRun() {
         initialize();
         while (!stopping) {
+            Throwable failure = externalFailure;
+            if (failure != null)
+                onFailure(failure);

Review Comment:
   Is this in danger of being called more than once, particularly if the 
connector has this problem and then a pause/resume request comes in? Is that a 
bad thing?
   
   It looks like the connector thread just waits in this loop until something 
external calls shutdown(), so I would expect this to get called whenever 
someone notify()'s the worker connector thread. 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -131,9 +133,26 @@ public void run() {
         }
     }
 
+    /**
+     * Fail the connector.
+     * @param cause the cause of the failure; if null, the connector will not 
be failed
+     */
+    public void fail(Throwable cause) {
+        synchronized (this) {
+            if (this.externalFailure != null)
+                return;
+            this.externalFailure = cause;
+            notify();
+        }
+    }
+
     void doRun() {
         initialize();
         while (!stopping) {
+            Throwable failure = externalFailure;
+            if (failure != null)

Review Comment:
   nit: curly braces



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -391,7 +391,15 @@ public List<Map<String, String>> 
connectorTaskConfigs(String connName, Connector
             Connector connector = workerConnector.connector();
             try (LoaderSwap loaderSwap = 
plugins.withClassLoader(workerConnector.loader())) {
                 String taskClassName = connector.taskClass().getName();
-                for (Map<String, String> taskProps : 
connector.taskConfigs(maxTasks)) {
+                List<Map<String, String>> taskConfigs = 
connector.taskConfigs(maxTasks);
+                try {
+                    checkTasksMax(connName, taskConfigs.size(), maxTasks, 
connConfig.enforceTasksMax());
+                } catch (TooManyTasksException e) {
+                    // TODO: This control flow is awkward. Push task config 
generation into WorkerConnector class?

Review Comment:
   This makes sense to me; checkTasksMax could be public static, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to