This is an automated email from the ASF dual-hosted git repository.
viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 86daf8ce657 KAFKA-14913: Using
ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect
Runtime (#13594)
86daf8ce657 is described below
commit 86daf8ce6573eb79d6e78381dbab738055f914c4
Author: vamossagar12 <[email protected]>
AuthorDate: Mon May 8 20:09:47 2023 +0530
KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close
executors in Connect Runtime (#13594)
#13557 introduced a utils method to close executors silently. This PR
leverages that method to close executors in connect runtime. There was
duplicate code while closing the executors which isn't the case with this PR.
Note that there are a few more executors used in Connect runtime but their
close methods don't follow this pattern of shutdown, await and shutdown. Some
of them have some logic like executor like Worker, so not changing at such
places.
---------
Co-authored-by: Sagar Rao <[email protected]>
Reviewers: Daniel Urban <[email protected]>, Yash Mayya
<[email protected]>, Viktor Somogyi-Vass <[email protected]>
---
.../org/apache/kafka/common/utils/ThreadUtils.java | 21 ++++++++++++++++++---
.../connect/runtime/SourceTaskOffsetCommitter.java | 9 +--------
.../org/apache/kafka/connect/runtime/Worker.java | 16 ++--------------
.../runtime/distributed/DistributedHerder.java | 19 +++----------------
.../runtime/standalone/StandaloneHerder.java | 10 ++--------
.../connect/storage/MemoryOffsetBackingStore.java | 13 +------------
.../runtime/SourceTaskOffsetCommitterTest.java | 3 ++-
7 files changed, 29 insertions(+), 62 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
b/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
index 19655538297..40ac9443cf2 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java
@@ -17,6 +17,9 @@
package org.apache.kafka.common.utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -26,6 +29,8 @@ import java.util.concurrent.atomic.AtomicLong;
* Utilities for working with threads.
*/
public class ThreadUtils {
+
+ private static final Logger log =
LoggerFactory.getLogger(ThreadUtils.class);
/**
* Create a new ThreadFactory.
*
@@ -56,20 +61,30 @@ public class ThreadUtils {
}
/**
- * Shuts down an executor service with a timeout. After the timeout/on
interrupt, the service is forcefully closed.
+ * Shuts down an executor service in two phases, first by calling shutdown
to reject incoming tasks,
+ * and then calling shutdownNow, if necessary, to cancel any lingering
tasks.
+ * After the timeout/on interrupt, the service is forcefully closed.
* @param executorService The service to shut down.
* @param timeout The timeout of the shutdown.
* @param timeUnit The time unit of the shutdown timeout.
*/
public static void shutdownExecutorServiceQuietly(ExecutorService
executorService,
long timeout, TimeUnit
timeUnit) {
- executorService.shutdown();
+ executorService.shutdown(); // Disable new tasks from being submitted
try {
+ // Wait a while for existing tasks to terminate
if (!executorService.awaitTermination(timeout, timeUnit)) {
- executorService.shutdownNow();
+ executorService.shutdownNow(); // Cancel currently executing
tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!executorService.awaitTermination(timeout, timeUnit)) {
+ log.error("Executor {} did not terminate in time",
executorService);
+ }
}
} catch (InterruptedException e) {
+ // (Re-)Cancel if current thread also interrupted
executorService.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
}
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
index 3aa85aa386f..d32c768a7fd 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -68,14 +68,7 @@ class SourceTaskOffsetCommitter {
}
public void close(long timeoutMs) {
- commitExecutorService.shutdown();
- try {
- if (!commitExecutorService.awaitTermination(timeoutMs,
TimeUnit.MILLISECONDS)) {
- log.error("Graceful shutdown of offset commitOffsets thread
timed out.");
- }
- } catch (InterruptedException e) {
- // ignore and allow to exit immediately
- }
+ ThreadUtils.shutdownExecutorServiceQuietly(commitExecutorService,
timeoutMs, TimeUnit.MILLISECONDS);
}
public void schedule(final ConnectorTaskId id, final WorkerSourceTask
workerTask) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index b43d9a7dc58..bc08c48322e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
@@ -247,20 +248,7 @@ public class Worker {
connectorStatusMetricsGroup.close();
workerConfigTransformer.close();
- executor.shutdown();
- try {
- // Wait a while for existing tasks to terminate
- if
(!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS,
TimeUnit.MILLISECONDS)) {
- executor.shutdownNow(); //cancel current executing threads
- // Wait a while for tasks to respond to being cancelled
- if
(!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS,
TimeUnit.MILLISECONDS))
- log.error("Executor did not terminate in time");
- }
- } catch (InterruptedException e) {
- executor.shutdownNow(); // (Re-)Cancel if current thread also
interrupted
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
+ ThreadUtils.shutdownExecutorServiceQuietly(executor,
EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
/**
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 88f92f53f17..51258f655ec 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -804,22 +804,9 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
stopping.set(true);
member.wakeup();
- herderExecutor.shutdown();
- try {
- if (!herderExecutor.awaitTermination(herderExecutorTimeoutMs(),
TimeUnit.MILLISECONDS))
- herderExecutor.shutdownNow();
-
- forwardRequestExecutor.shutdown();
- startAndStopExecutor.shutdown();
-
- if
(!forwardRequestExecutor.awaitTermination(FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS,
TimeUnit.MILLISECONDS))
- forwardRequestExecutor.shutdownNow();
- if
(!startAndStopExecutor.awaitTermination(START_AND_STOP_SHUTDOWN_TIMEOUT_MS,
TimeUnit.MILLISECONDS))
- startAndStopExecutor.shutdownNow();
- } catch (InterruptedException e) {
- // ignore
- }
-
+ ThreadUtils.shutdownExecutorServiceQuietly(herderExecutor,
herderExecutorTimeoutMs(), TimeUnit.MILLISECONDS);
+ ThreadUtils.shutdownExecutorServiceQuietly(forwardRequestExecutor,
FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ ThreadUtils.shutdownExecutorServiceQuietly(startAndStopExecutor,
START_AND_STOP_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
log.info("Herder stopped");
running = false;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index b3c83e3dd78..b921305143a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.standalone;
+import org.apache.kafka.common.utils.ThreadUtils;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
@@ -105,14 +106,7 @@ public class StandaloneHerder extends AbstractHerder {
@Override
public synchronized void stop() {
log.info("Herder stopping");
- requestExecutorService.shutdown();
- try {
- if (!requestExecutorService.awaitTermination(30, TimeUnit.SECONDS))
- requestExecutorService.shutdownNow();
- } catch (InterruptedException e) {
- // ignore
- }
-
+ ThreadUtils.shutdownExecutorServiceQuietly(requestExecutorService, 30,
TimeUnit.SECONDS);
// There's no coordination/hand-off to do here since this is all
standalone. Instead, we
// should just clean up the stuff we normally would, i.e. cleanly
checkpoint and shutdown all
// the tasks.
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
index 60255f8ccd7..d29f41326cc 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.storage;
-import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.common.utils.ThreadUtils;
@@ -61,17 +60,7 @@ public abstract class MemoryOffsetBackingStore implements
OffsetBackingStore {
@Override
public void stop() {
if (executor != null) {
- executor.shutdown();
- // Best effort wait for any get() and set() tasks (and caller's
callbacks) to complete.
- try {
- executor.awaitTermination(30, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (!executor.shutdownNow().isEmpty()) {
- throw new ConnectException("Failed to stop
MemoryOffsetBackingStore. Exiting without cleanly " +
- "shutting down pending tasks and/or callbacks.");
- }
+ ThreadUtils.shutdownExecutorServiceQuietly(executor, 30,
TimeUnit.SECONDS);
executor = null;
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
index 9099de4c94d..6c3a85531cd 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -99,7 +100,7 @@ public class SourceTaskOffsetCommitterTest {
// Normal termination, where termination times out.
when(executor.awaitTermination(timeoutMs,
TimeUnit.MILLISECONDS)).thenReturn(false);
- try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) {
+ try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(ThreadUtils.class)) {
committer.close(timeoutMs);
assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
e.getLevel().equals("ERROR")));
}