This is an automated email from the ASF dual-hosted git repository.

viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 86daf8ce657 KAFKA-14913: Using 
ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect 
Runtime (#13594)
86daf8ce657 is described below

commit 86daf8ce6573eb79d6e78381dbab738055f914c4
Author: vamossagar12 <[email protected]>
AuthorDate: Mon May 8 20:09:47 2023 +0530

    KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close 
executors in Connect Runtime (#13594)
    
    #13557 introduced a utils method to close executors silently. This PR 
leverages that method to close executors in connect runtime. There was 
duplicate code while closing the executors which isn't the case with this PR.
    
    Note that there are a few more executors used in Connect runtime but their 
close methods don't follow this pattern of shutdown, await and shutdown. Some 
of them have some logic like executor like Worker, so not changing at such 
places.
    
    ---------
    
    Co-authored-by: Sagar Rao <[email protected]>
    
    Reviewers: Daniel Urban <[email protected]>, Yash Mayya 
<[email protected]>, Viktor Somogyi-Vass <[email protected]>
---
 .../org/apache/kafka/common/utils/ThreadUtils.java  | 21 ++++++++++++++++++---
 .../connect/runtime/SourceTaskOffsetCommitter.java  |  9 +--------
 .../org/apache/kafka/connect/runtime/Worker.java    | 16 ++--------------
 .../runtime/distributed/DistributedHerder.java      | 19 +++----------------
 .../runtime/standalone/StandaloneHerder.java        | 10 ++--------
 .../connect/storage/MemoryOffsetBackingStore.java   | 13 +------------
 .../runtime/SourceTaskOffsetCommitterTest.java      |  3 ++-
 7 files changed, 29 insertions(+), 62 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
index 19655538297..40ac9443cf2 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.common.utils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -26,6 +29,8 @@ import java.util.concurrent.atomic.AtomicLong;
  * Utilities for working with threads.
  */
 public class ThreadUtils {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ThreadUtils.class);
     /**
      * Create a new ThreadFactory.
      *
@@ -56,20 +61,30 @@ public class ThreadUtils {
     }
 
     /**
-     * Shuts down an executor service with a timeout. After the timeout/on 
interrupt, the service is forcefully closed.
+     * Shuts down an executor service in two phases, first by calling shutdown 
to reject incoming tasks,
+     * and then calling shutdownNow, if necessary, to cancel any lingering 
tasks.
+     * After the timeout/on interrupt, the service is forcefully closed.
      * @param executorService The service to shut down.
      * @param timeout The timeout of the shutdown.
      * @param timeUnit The time unit of the shutdown timeout.
      */
     public static void shutdownExecutorServiceQuietly(ExecutorService 
executorService,
                                                       long timeout, TimeUnit 
timeUnit) {
-        executorService.shutdown();
+        executorService.shutdown(); // Disable new tasks from being submitted
         try {
+            // Wait a while for existing tasks to terminate
             if (!executorService.awaitTermination(timeout, timeUnit)) {
-                executorService.shutdownNow();
+                executorService.shutdownNow(); // Cancel currently executing 
tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!executorService.awaitTermination(timeout, timeUnit)) {
+                    log.error("Executor {} did not terminate in time", 
executorService);
+                }
             }
         } catch (InterruptedException e) {
+            // (Re-)Cancel if current thread also interrupted
             executorService.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
         }
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
index 3aa85aa386f..d32c768a7fd 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -68,14 +68,7 @@ class SourceTaskOffsetCommitter {
     }
 
     public void close(long timeoutMs) {
-        commitExecutorService.shutdown();
-        try {
-            if (!commitExecutorService.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS)) {
-                log.error("Graceful shutdown of offset commitOffsets thread 
timed out.");
-            }
-        } catch (InterruptedException e) {
-            // ignore and allow to exit immediately
-        }
+        ThreadUtils.shutdownExecutorServiceQuietly(commitExecutorService, 
timeoutMs, TimeUnit.MILLISECONDS);
     }
 
     public void schedule(final ConnectorTaskId id, final WorkerSourceTask 
workerTask) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index b43d9a7dc58..bc08c48322e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
@@ -247,20 +248,7 @@ public class Worker {
         connectorStatusMetricsGroup.close();
 
         workerConfigTransformer.close();
-        executor.shutdown();
-        try {
-            // Wait a while for existing tasks to terminate
-            if 
(!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
-                executor.shutdownNow(); //cancel current executing threads
-                // Wait a while for tasks to respond to being cancelled
-                if 
(!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS))
-                    log.error("Executor did not terminate in time");
-            }
-        } catch (InterruptedException e) {
-            executor.shutdownNow(); // (Re-)Cancel if current thread also 
interrupted
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
+        ThreadUtils.shutdownExecutorServiceQuietly(executor, 
EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
     }
 
     /**
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 88f92f53f17..51258f655ec 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -804,22 +804,9 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 
         stopping.set(true);
         member.wakeup();
-        herderExecutor.shutdown();
-        try {
-            if (!herderExecutor.awaitTermination(herderExecutorTimeoutMs(), 
TimeUnit.MILLISECONDS))
-                herderExecutor.shutdownNow();
-
-            forwardRequestExecutor.shutdown();
-            startAndStopExecutor.shutdown();
-
-            if 
(!forwardRequestExecutor.awaitTermination(FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, 
TimeUnit.MILLISECONDS))
-                forwardRequestExecutor.shutdownNow();
-            if 
(!startAndStopExecutor.awaitTermination(START_AND_STOP_SHUTDOWN_TIMEOUT_MS, 
TimeUnit.MILLISECONDS))
-                startAndStopExecutor.shutdownNow();
-        } catch (InterruptedException e) {
-            // ignore
-        }
-
+        ThreadUtils.shutdownExecutorServiceQuietly(herderExecutor, 
herderExecutorTimeoutMs(), TimeUnit.MILLISECONDS);
+        ThreadUtils.shutdownExecutorServiceQuietly(forwardRequestExecutor, 
FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        ThreadUtils.shutdownExecutorServiceQuietly(startAndStopExecutor, 
START_AND_STOP_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
         log.info("Herder stopped");
         running = false;
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index b3c83e3dd78..b921305143a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.standalone;
 
+import org.apache.kafka.common.utils.ThreadUtils;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -105,14 +106,7 @@ public class StandaloneHerder extends AbstractHerder {
     @Override
     public synchronized void stop() {
         log.info("Herder stopping");
-        requestExecutorService.shutdown();
-        try {
-            if (!requestExecutorService.awaitTermination(30, TimeUnit.SECONDS))
-                requestExecutorService.shutdownNow();
-        } catch (InterruptedException e) {
-            // ignore
-        }
-
+        ThreadUtils.shutdownExecutorServiceQuietly(requestExecutorService, 30, 
TimeUnit.SECONDS);
         // There's no coordination/hand-off to do here since this is all 
standalone. Instead, we
         // should just clean up the stuff we normally would, i.e. cleanly 
checkpoint and shutdown all
         // the tasks.
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
index 60255f8ccd7..d29f41326cc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.storage;
 
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.common.utils.ThreadUtils;
@@ -61,17 +60,7 @@ public abstract class MemoryOffsetBackingStore implements 
OffsetBackingStore {
     @Override
     public void stop() {
         if (executor != null) {
-            executor.shutdown();
-            // Best effort wait for any get() and set() tasks (and caller's 
callbacks) to complete.
-            try {
-                executor.awaitTermination(30, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-            if (!executor.shutdownNow().isEmpty()) {
-                throw new ConnectException("Failed to stop 
MemoryOffsetBackingStore. Exiting without cleanly " +
-                        "shutting down pending tasks and/or callbacks.");
-            }
+            ThreadUtils.shutdownExecutorServiceQuietly(executor, 30, 
TimeUnit.SECONDS);
             executor = null;
         }
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
index 9099de4c94d..6c3a85531cd 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -99,7 +100,7 @@ public class SourceTaskOffsetCommitterTest {
         // Normal termination, where termination times out.
         when(executor.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS)).thenReturn(false);
 
-        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) {
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(ThreadUtils.class)) {
             committer.close(timeoutMs);
             assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("ERROR")));
         }

Reply via email to