[
https://issues.apache.org/jira/browse/KAFKA-17162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax reassigned KAFKA-17162:
---------------------------------------
Assignee: Matthias J. Sax
> DefaultTaskManagerTest may leak AwaitingRunnable thread
> -------------------------------------------------------
>
> Key: KAFKA-17162
> URL: https://issues.apache.org/jira/browse/KAFKA-17162
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
> Affects Versions: 3.9.0
> Reporter: Ao Li
> Assignee: Matthias J. Sax
> Priority: Minor
>
> The `DefaultTaskManagerTest#shouldReturnFromAwaitOnInterruption` will fail
> with the following patch:
> {code}
> ```
> diff --git
> a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
>
> b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
> index 5d2db3c279..b87a82b85b 100644
> ---
> a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
> +++
> b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
> @@ -348,6 +348,10 @@ public class DefaultTaskManager implements TaskManager {
> }
>
> private <T> T returnWithTasksLocked(final Supplier<T> action) {
> + try {
> + Thread.sleep(1000);
> + } catch (final Exception e) {
> + }
> tasksLock.lock();
> try {
> return action.get();
> diff --git
> a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
>
> b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
> index 98065eae7d..0d8dde7156 100644
> ---
> a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
> +++
> b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
> @@ -114,6 +114,10 @@ public class DefaultTaskManagerTest {
> @Override
> public void run() {
> while (!shutdownRequested.get()) {
> + try {
> + Thread.sleep(1000);
> + } catch (final Exception e) {
> + }
> try {
> taskManager.awaitProcessableTasks();
> } catch (final InterruptedException ignored) {
> @@ -151,6 +155,8 @@ public class DefaultTaskManagerTest {
> assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT,
> TimeUnit.MILLISECONDS));
>
> awaitingRunnable.shutdown();
> + Thread.sleep(5000);
> + assertFalse(awaitingThread.isAlive());
> }
>
> @Test
> ```
> {code}
> awatingThread is left unclosed because it was waiting for the signal
> {code}
> "Thread-3" #25 [26371] prio=5 os_prio=31 cpu=9.68ms elapsed=74.89s
> tid=0x00000001250d8600 nid=26371 waiting on condition [0x0000000173d4e000]
> java.lang.Thread.State: WAITING (parking)
> at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
> - parking to wait for <0x00000007dcd49b88> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at
> java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:371)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block([email protected]/AbstractQueuedSynchronizer.java:519)
> at
> java.util.concurrent.ForkJoinPool.unmanagedBlock([email protected]/ForkJoinPool.java:3780)
> at
> java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3725)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:1707)
> at
> org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.lambda$awaitProcessableTasks$1(DefaultTaskManager.java:142)
> at
> org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager$$Lambda/0x0000007001305428.get(Unknown
> Source)
> at
> org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.returnWithTasksLocked(DefaultTaskManager.java:357)
> at
> org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.awaitProcessableTasks(DefaultTaskManager.java:129)
> at
> org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManagerTest$AwaitingRunnable.run(DefaultTaskManagerTest.java:122)
> at java.lang.Thread.runWith([email protected]/Thread.java:1596)
> at java.lang.Thread.run([email protected]/Thread.java:1583)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)