This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new c17c9f0a6c5 KAFKA-15421: fix network thread leak in
testThreadPoolResize (#14320)
c17c9f0a6c5 is described below
commit c17c9f0a6c56e528c9ce769249198d088e49b894
Author: Luke Chen <[email protected]>
AuthorDate: Sun Sep 3 16:16:54 2023 +0800
KAFKA-15421: fix network thread leak in testThreadPoolResize (#14320)
In SocketServerTest, we create SocketServer and enableRequestProcessing on
each test class initialization. That's fine since we shutdown it in @AfterEach.
The issue we have is we disabled 2 tests in this test suite. And when running
these disabled tests, we will go through class initialization, but without
@AfterEach. That causes 2 network thread leaked.
Compared the error message in
DynamicBrokerReconfigurationTest#testThreadPoolResize test here:
org.opentest4j.AssertionFailedError: Invalid threads: expected 6, got 8:
List(data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0,
data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0,
data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0,
data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0,
data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0,
data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0,
data-plane-kafka [...]
The 2 unexpected network threads are leaked from SocketServerTest.
Reviewers: Satish Duggana <[email protected]>, Christo Lolov
<[email protected]>, Divij Vaidya <[email protected]>, Kamal Chandraprakash
<[email protected]>, Chris Egerton <[email protected]>
---
.../integration/kafka/server/DynamicBrokerReconfigurationTest.scala | 1 -
core/src/test/scala/unit/kafka/network/SocketServerTest.scala | 2 +-
.../java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java | 2 --
3 files changed, 1 insertion(+), 4 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index fcbc7f6fe9e..c92287184a8 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -798,7 +798,6 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
consumer.commitSync()
}
- @Disabled
@Test
def testThreadPoolResize(): Unit = {
val requestHandlerPrefix = "data-plane-kafka-request-handler-"
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 58ba9d3af7f..63759fae871 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -81,7 +81,6 @@ class SocketServerTest {
private val apiVersionManager = new
SimpleApiVersionManager(ListenerType.BROKER, true, false,
() => new Features(MetadataVersion.latest(), Collections.emptyMap[String,
java.lang.Short], 0, true))
val server = new SocketServer(config, metrics, Time.SYSTEM,
credentialProvider, apiVersionManager)
- server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
val sockets = new ArrayBuffer[Socket]
private val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafka")
@@ -94,6 +93,7 @@ class SocketServerTest {
@BeforeEach
def setUp(): Unit = {
+ server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
// Run the tests with TRACE logging to exercise request logging path
logLevelToRestore = kafkaLogger.getLevel
kafkaLogger.setLevel(Level.TRACE)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index c22fc29aeb8..bed5452bdf5 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -32,7 +32,6 @@ import
org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -155,7 +154,6 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
context = new TieredStorageTestContext(this);
}
- @Disabled
@Test
public void executeTieredStorageTest() {
TieredStorageTestBuilder builder = new TieredStorageTestBuilder();