tombentley commented on a change in pull request #10605:
URL: https://github.com/apache/kafka/pull/10605#discussion_r622876998



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) {
             ClassLoader savedLoader = plugins.currentThreadLoader();
             try {
                 savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-                task.stop();
+                CountDownLatch latch = new CountDownLatch(1);
+                new Thread() {
+                    @Override
+                    public void run() {
+                        task.stop();

Review comment:
       If stop throws we won't count down the latch. No harm will result except 
there will be an erroneous log messages about exceeding the stop timeout.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -824,7 +825,7 @@ private WorkerErrantRecordReporter 
createWorkerErrantRecordReporter(
         return null;
     }
 
-    private void stopTask(ConnectorTaskId taskId) {
+    private void stopTask(ConnectorTaskId taskId, long timeout) {

Review comment:
       `timeoutMs` would be unambigous

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) {
             ClassLoader savedLoader = plugins.currentThreadLoader();
             try {
                 savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-                task.stop();
+                CountDownLatch latch = new CountDownLatch(1);
+                new Thread() {

Review comment:
       We should name the thread so that thread dumps are a bit more 
informative. I _think_ these should be daemon threads because if we're prepared 
to basically ignore the non-return of `task.stop()` during runtime I don't see 
why we'd block jvm exit for them. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) {
             ClassLoader savedLoader = plugins.currentThreadLoader();
             try {
                 savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-                task.stop();
+                CountDownLatch latch = new CountDownLatch(1);
+                new Thread() {
+                    @Override
+                    public void run() {
+                        task.stop();
+                        latch.countDown();
+                    }
+                }.start();
+                // Wait for thread to terminate, but not longer than timeout.
+                if (timeout <= 0) {

Review comment:
       It's not required to protect the await, but is to get the logging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to