This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3404f65cdba KAFKA-19324 Make org.apache.kafka.common.test.TestUtils
package-private to prevent cross-module access (#19884)
3404f65cdba is described below
commit 3404f65cdbab68de23137603b7f95c1134a6a982
Author: Bolin Lin <[email protected]>
AuthorDate: Sun Jun 22 22:47:40 2025 +0800
KAFKA-19324 Make org.apache.kafka.common.test.TestUtils package-private to
prevent cross-module access (#19884)
Description
* Replace `org.apache.kafka.common.test.TestUtils` with
`org.apache.kafka.test.TestUtils` in outer package modules to
standardize test utility usage
* Move `waitUntilLeaderIsElectedOrChangedWithAdmin` method from
`org.apache.kafka.test.TestUtils` to `ClusterInstance` and refactor for
better code organization
* Add `org.apache.kafka.test.TestUtils` dependency to
`transaction-coordinator` import control
Reviewers: PoAn Yang [[email protected]](mailto:[email protected]), Ken
Huang [[email protected]](mailto:[email protected]), Ken Huang
[[email protected]](mailto:[email protected]), Chia-Ping Tsai
[[email protected]](mailto:[email protected])
---
.../import-control-transaction-coordinator.xml | 1 +
.../kafka/clients/ClientRebootstrapTest.java | 2 +-
.../org/apache/kafka/clients/ClientsTestUtils.java | 2 +-
.../clients/MetadataVersionIntegrationTest.java | 2 +-
.../kafka/clients/TransactionsExpirationTest.java | 2 +-
.../TransactionsWithMaxInFlightOneTest.java | 2 +-
.../kafka/clients/admin/DeleteTopicTest.java | 5 +-
.../clients/admin/ListOffsetsIntegrationTest.java | 2 +-
.../clients/consumer/ConsumerIntegrationTest.java | 2 +-
.../consumer/PlaintextConsumerPollTest.java | 2 +-
.../PlaintextConsumerSubscriptionTest.java | 2 +-
.../consumer/ShareConsumerRackAwareTest.java | 2 +-
.../producer/ProducerSendWhileDeletionTest.java | 2 +-
.../security/GroupAuthorizerIntegrationTest.java | 2 +-
.../server/quota/CustomQuotaCallbackTest.java | 2 +-
.../transaction/ProducerIntegrationTest.scala | 3 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 17 +++--
.../kafka/server/purgatory/DelayedFutureTest.java | 2 +-
.../apache/kafka/admin/RemoteTopicCrudTest.java | 8 ++-
.../apache/kafka/server/log/LogAppendTimeTest.java | 2 +-
.../apache/kafka/common/test/ClusterInstance.java | 48 ++++++++++++++
.../org/apache/kafka/common/test/TestUtils.java | 74 ++--------------------
.../junit/ClusterInstanceParameterResolver.java | 1 -
.../test/junit/RaftClusterInvocationContext.java | 33 +++++++++-
.../test/junit/ClusterTestExtensionsTest.java | 11 ++--
.../transaction/ProducerIdManagerTest.java | 2 +-
26 files changed, 128 insertions(+), 105 deletions(-)
diff --git a/checkstyle/import-control-transaction-coordinator.xml
b/checkstyle/import-control-transaction-coordinator.xml
index bf2157750c3..a48100a9acc 100644
--- a/checkstyle/import-control-transaction-coordinator.xml
+++ b/checkstyle/import-control-transaction-coordinator.xml
@@ -38,6 +38,7 @@
<allow pkg="org.apache.kafka.coordinator.transaction" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.common.test.api" />
+ <allow pkg="org.apache.kafka.test" />
<allow pkg="org.slf4j" />
<subpackage name="generated">
<allow pkg="org.apache.kafka.common.protocol" />
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
index 2f11e13377d..b388744b711 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
@@ -24,11 +24,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.List;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
index efbfc884e72..0c99d51da88 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.ArrayList;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java
index 4e44eef5a21..3d560892fe8 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java
@@ -19,11 +19,11 @@ package org.apache.kafka.clients;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
import java.util.Map;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsExpirationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsExpirationTest.java
index 1c523a43554..01eca9afb97 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsExpirationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsExpirationTest.java
@@ -34,7 +34,6 @@ import
org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterFeature;
import org.apache.kafka.common.test.api.ClusterTest;
@@ -47,6 +46,7 @@ import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.ArrayList;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsWithMaxInFlightOneTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsWithMaxInFlightOneTest.java
index fb27f9be4f0..d95901249c0 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsWithMaxInFlightOneTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsWithMaxInFlightOneTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@@ -38,6 +37,7 @@ import
org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.ArrayList;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
index fceebc0c1b6..2792707465c 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
@@ -25,7 +25,6 @@ import
org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@@ -37,6 +36,7 @@ import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.storage.internals.log.VerificationGuard;
+import org.apache.kafka.test.TestUtils;
import java.util.ArrayList;
import java.util.Collection;
@@ -202,6 +202,7 @@ public class DeleteTopicTest {
waitForReplicaCreated(cluster.brokers(), topicPartition, "Replicas
for topic " + DEFAULT_TOPIC + " not created.");
}
}
+
@ClusterTest
public void testDeleteNonExistingTopic(ClusterInstance cluster) throws
Exception {
try (Admin admin = cluster.admin()) {
@@ -220,7 +221,7 @@ public class DeleteTopicTest {
cluster.waitForTopic(topic, 0);
waitForReplicaCreated(cluster.brokers(), topicPartition, "Replicas
for topic test not created.");
- TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
DEFAULT_TOPIC, 0, 1000);
+ cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
DEFAULT_TOPIC, 0, 1000);
}
}
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
index 8487f0ccda5..678a3b23b46 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
@@ -28,12 +28,12 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index 8a41c1a8beb..03a9b159e23 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -31,12 +31,12 @@ import
org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.Collection;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
index 20994134c5b..71853d97814 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
@@ -23,12 +23,12 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
index e8c32d37908..065e6600a54 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
@@ -22,11 +22,11 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
index 7628220bff2..8d6a8469694 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
@@ -26,11 +26,11 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.List;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
index 7ee809ca46b..6977be18fc9 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@@ -36,6 +35,7 @@ import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
import org.apache.kafka.storage.internals.log.UnifiedLog;
+import org.apache.kafka.test.TestUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
index a1eb4c4c027..d899e4faea9 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
@@ -44,7 +44,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@@ -53,6 +52,7 @@ import
org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.test.TestUtils;
import java.net.InetAddress;
import java.time.Duration;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
index ee891b47826..d6e27bfdf53 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
@@ -26,11 +26,11 @@ import org.apache.kafka.common.metrics.Monitorable;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.config.QuotaConfig;
+import org.apache.kafka.test.TestUtils;
import java.util.LinkedHashMap;
import java.util.List;
diff --git
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
index 75b1bba0e12..ebc0e94f661 100644
---
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
@@ -31,10 +31,11 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest,
InitProducerIdResponse}
-import org.apache.kafka.common.test.{ClusterInstance, TestUtils}
+import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{Feature, MetadataVersion}
+import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions.{assertEquals, assertInstanceOf,
assertThrows, assertTrue}
import java.time.Duration
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 28ce04859a8..835fc093f0f 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.storage.internals.log.CleanerConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
-import org.apache.kafka.common.test.{TestUtils => JTestUtils}
import scala.jdk.CollectionConverters._
@@ -592,8 +591,17 @@ class KafkaConfigTest {
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"plaintext://localhost:9091,SsL://localhost:9092")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT")
val config = KafkaConfig.fromProps(props)
- assertEquals(Some("SSL://localhost:9092"),
config.listeners.find(_.listener == "SSL").map(JTestUtils.endpointToString))
- assertEquals(Some("PLAINTEXT://localhost:9091"),
config.listeners.find(_.listener ==
"PLAINTEXT").map(JTestUtils.endpointToString))
+ assertEndpointsEqual(new Endpoint("SSL", SecurityProtocol.SSL,
"localhost", 9092),
+ config.listeners.find(_.listener == "SSL").getOrElse(fail("SSL endpoint
not found")))
+ assertEndpointsEqual( new Endpoint("PLAINTEXT",
SecurityProtocol.PLAINTEXT, "localhost", 9091),
+ config.listeners.find(_.listener ==
"PLAINTEXT").getOrElse(fail("PLAINTEXT endpoint not found")))
+ }
+
+ private def assertEndpointsEqual(expected: Endpoint, actual: Endpoint): Unit
= {
+ assertEquals(expected.host(), actual.host(), "Host mismatch")
+ assertEquals(expected.port(), actual.port(), "Port mismatch")
+ assertEquals(expected.listener(), actual.listener(), "Listener mismatch")
+ assertEquals(expected.securityProtocol(), actual.securityProtocol(),
"Security protocol mismatch")
}
private def listenerListToEndPoints(listenerList: String,
@@ -1186,7 +1194,8 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(defaults)
assertEquals(1, config.brokerId)
- assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"),
config.effectiveAdvertisedBrokerListeners.map(JTestUtils.endpointToString))
+ assertEndpointsEqual(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT,
"127.0.0.1", 1122),
+ config.effectiveAdvertisedBrokerListeners.head)
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3),
config.maxConnectionsPerIpOverrides)
assertEquals(util.List.of("/tmp1", "/tmp2"), config.logDirs)
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
diff --git
a/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java
b/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java
index d3faa193b4d..db3a74055a0 100644
---
a/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java
+++
b/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java
@@ -17,8 +17,8 @@
package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
diff --git
a/storage/src/test/java/org/apache/kafka/admin/RemoteTopicCrudTest.java
b/storage/src/test/java/org/apache/kafka/admin/RemoteTopicCrudTest.java
index 875e4bc7d21..3ba16c89655 100644
--- a/storage/src/test/java/org/apache/kafka/admin/RemoteTopicCrudTest.java
+++ b/storage/src/test/java/org/apache/kafka/admin/RemoteTopicCrudTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@@ -41,6 +40,8 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
@@ -554,7 +555,7 @@ class RemoteTopicCrudTest {
}
private void verifyRemoteLogTopicConfigs(Map<String, String> topicConfig)
throws Exception {
- TestUtils.waitForCondition(() -> {
+ TestCondition condition = () -> {
var logBuffer = cluster.brokers().values()
.stream()
.map(broker -> broker.logManager().getLog(new
TopicPartition(testTopicName, 0), false))
@@ -613,7 +614,8 @@ class RemoteTopicCrudTest {
}
}
return result;
- }, "Failed to update topic config $topicConfig" + topicConfig);
+ };
+ TestUtils.waitForCondition(condition, "Failed to update topic config
$topicConfig" + topicConfig);
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
b/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
index dd5293461f8..7269b2e4b5c 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
@@ -29,10 +29,10 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.ArrayList;
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index 676adbf21e8..72dcda8c65f 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -36,10 +36,13 @@ import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -65,6 +68,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -407,10 +411,54 @@ public interface ClusterInstance {
}
}
+ /**
+ * Wait for a leader to be elected or changed using the provided admin
client.
+ */
+ default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
+ String topic,
+ int partitionNumber,
+ long timeoutMs)
throws Exception {
+ long startTime = System.currentTimeMillis();
+ TopicPartition topicPartition = new TopicPartition(topic,
partitionNumber);
+
+ while (System.currentTimeMillis() < startTime + timeoutMs) {
+ try {
+ TopicDescription topicDescription =
admin.describeTopics(List.of(topic))
+ .allTopicNames().get().get(topic);
+
+ Optional<Integer> leader =
topicDescription.partitions().stream()
+ .filter(partitionInfo -> partitionInfo.partition() ==
partitionNumber)
+ .findFirst()
+ .map(partitionInfo -> {
+ int leaderId = partitionInfo.leader().id();
+ return leaderId == Node.noNode().id() ? null :
leaderId;
+ });
+
+ if (leader.isPresent()) {
+ return leader.get();
+ }
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof UnknownTopicOrPartitionException ||
+ cause instanceof LeaderNotAvailableException) {
+ continue;
+ } else {
+ throw e;
+ }
+ }
+
+ TimeUnit.MILLISECONDS.sleep(Math.min(100L, timeoutMs));
+ }
+
+ throw new AssertionError("Timing out after " + timeoutMs +
+ " ms since a leader was not elected for partition " +
topicPartition);
+ }
+
default List<Integer> boundPorts() {
return brokers().values().stream()
.map(KafkaBroker::socketServer)
.map(s -> s.boundPort(clientListener()))
.toList();
+
}
}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
index 158a93b8a77..4c75272edd4 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
@@ -16,13 +16,7 @@
*/
package org.apache.kafka.common.test;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.common.Endpoint;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@@ -32,18 +26,17 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
-import java.util.List;
-import java.util.Optional;
import java.util.Random;
-import java.util.function.BiFunction;
import java.util.function.Supplier;
import static java.lang.String.format;
/**
- * Helper functions for writing unit tests
+ * Helper functions for writing unit tests.
+ * <p>
+ * <b>Package-private:</b> Not intended for use outside {@code
org.apache.kafka.common.test}.
*/
-public class TestUtils {
+class TestUtils {
private static final Logger log = LoggerFactory.getLogger(TestUtils.class);
/* A consistent random number generator to make tests repeatable */
@@ -103,18 +96,6 @@ public class TestUtils {
return file;
}
- /**
- * Convert EndPoint to String
- */
- public static String endpointToString(Endpoint endPoint) {
- String host = endPoint.host();
- int port = endPoint.port();
- ListenerName listenerName =
ListenerName.normalised(endPoint.listener());
-
- String hostport = (host == null) ? (":" + port) :
Utils.formatAddress(host, port);
- return listenerName.value() + "://" + hostport;
- }
-
/**
* uses default value of 15 seconds for timeout
*/
@@ -164,51 +145,4 @@ public class TestUtils {
String conditionDetails) throws
InterruptedException {
waitForCondition(testCondition, maxWaitMs, () -> conditionDetails);
}
-
- public static int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
- String topic,
- int
partitionNumber,
- long
timeoutMs) throws Exception {
- BiFunction<String, Integer, Optional<Integer>> getPartitionLeader =
(t, p) -> {
- try {
- return Optional.ofNullable(getLeaderFromAdmin(admin, t, p));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
- return doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic,
partitionNumber, timeoutMs);
- }
-
- private static Integer getLeaderFromAdmin(Admin admin, String topic, int
partition) throws Exception {
- TopicDescription topicDescription =
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
- return topicDescription.partitions().stream()
- .filter(partitionInfo -> partitionInfo.partition() == partition)
- .findFirst()
- .map(partitionInfo -> partitionInfo.leader().id() ==
Node.noNode().id() ? null : partitionInfo.leader().id())
- .orElse(null);
- }
-
- private static int doWaitUntilLeaderIsElectedOrChanged(BiFunction<String,
Integer, Optional<Integer>> getPartitionLeader,
- String topic,
- int partition,
- long timeoutMs)
throws Exception {
- long startTime = System.currentTimeMillis();
- TopicPartition topicPartition = new TopicPartition(topic, partition);
- Optional<Integer> electedLeader = Optional.empty();
-
- while (electedLeader.isEmpty() && System.currentTimeMillis() <
startTime + timeoutMs) {
- Optional<Integer> leader = getPartitionLeader.apply(topic,
partition);
- if (leader.isPresent()) {
- log.trace("Leader {} is elected for partition {}",
leader.get(), topicPartition);
- electedLeader = leader;
- } else {
- log.trace("Leader for partition {} is not elected yet",
topicPartition);
- }
- Thread.sleep(Math.min(timeoutMs, 100L));
- }
-
- Optional<Integer> finalLeader = electedLeader;
- return electedLeader.orElseThrow(() -> new AssertionError("Timing out
after " + timeoutMs
- + " ms since a leader was not elected for partition " +
topicPartition + ", leader is " + finalLeader));
- }
}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterInstanceParameterResolver.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterInstanceParameterResolver.java
index 5276ff1ad1a..27c03d1e531 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterInstanceParameterResolver.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterInstanceParameterResolver.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.test.junit;
-
import org.apache.kafka.common.test.ClusterInstance;
import org.junit.jupiter.api.TestTemplate;
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
index 011b0978b17..842c4cf4fbb 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
@@ -50,6 +49,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -71,6 +71,35 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
private final ClusterConfig clusterConfig;
private final boolean isCombined;
+ /**
+ * Wait for condition to be met for at most 15 seconds and throw assertion
failure otherwise.
+ * This should be used instead of {@code Thread.sleep} whenever possible
as it allows a longer timeout to be used
+ * without unnecessarily increasing test time (as the condition is checked
frequently). The longer timeout is needed to
+ * avoid transient failures due to slow or overloaded machines.
+ */
+ static void waitForCondition(final java.util.function.Supplier<Boolean>
testCondition,
+ final String conditionDetails) throws
InterruptedException {
+ var maxWaitMs = 15_000L;
+ long endTime = System.currentTimeMillis() + maxWaitMs;
+
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ if (testCondition.get()) {
+ return;
+ }
+ } catch (Exception e) {
+ if (System.currentTimeMillis() >= endTime) {
+ throw new AssertionError(String.format("Assertion failed
with an exception after %s ms", maxWaitMs), e);
+ }
+ }
+
+ if (System.currentTimeMillis() < endTime) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ }
+ throw new AssertionError("Condition not met: " + conditionDetails);
+ }
+
public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig
clusterConfig, boolean isCombined) {
this.baseDisplayName = baseDisplayName;
this.clusterConfig = clusterConfig;
@@ -180,7 +209,7 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
format();
if (started.compareAndSet(false, true)) {
clusterTestKit.startup();
- TestUtils.waitForCondition(
+ waitForCondition(
() ->
this.clusterTestKit.brokers().values().stream().allMatch(
brokers -> brokers.brokerState() ==
BrokerState.RUNNING
), "Broker never made it to RUNNING state.");
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
index 728eee67639..56da0e86d51 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
@@ -44,7 +44,6 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.JaasUtils;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.AutoStart;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
@@ -302,7 +301,7 @@ public class ClusterTestExtensionsTest {
producer.flush();
consumer.subscribe(List.of(topic));
List<ConsumerRecord<String, String>> records = new ArrayList<>();
- TestUtils.waitForCondition(() -> {
+ RaftClusterInvocationContext.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
return records.size() == 1;
}, "Failed to receive message");
@@ -330,7 +329,7 @@ public class ClusterTestExtensionsTest {
producer.flush();
consumer.subscribe(List.of(topic));
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- TestUtils.waitForCondition(() -> {
+ RaftClusterInvocationContext.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
return records.size() == 1;
}, "Failed to receive message");
@@ -407,7 +406,7 @@ public class ClusterTestExtensionsTest {
}
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer()) {
consumer.subscribe(List.of(topic));
- TestUtils.waitForCondition(() -> {
+ RaftClusterInvocationContext.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(100));
return records.count() == 1;
}, "Failed to receive message");
@@ -438,7 +437,7 @@ public class ClusterTestExtensionsTest {
try (Consumer<byte[], byte[]> consumer =
clusterInstance.consumer(nonAdminConfig)) {
consumer.subscribe(List.of(topic));
AtomicBoolean hasException = new AtomicBoolean(false);
- TestUtils.waitForCondition(() -> {
+ RaftClusterInvocationContext.waitForCondition(() -> {
if (hasException.get()) {
return true;
}
@@ -476,7 +475,7 @@ public class ClusterTestExtensionsTest {
try (Consumer<byte[], byte[]> consumer =
clusterInstance.consumer(unknownUserConfig)) {
consumer.subscribe(List.of(topic));
AtomicBoolean hasException = new AtomicBoolean(false);
- TestUtils.waitForCondition(() -> {
+ RaftClusterInvocationContext.waitForCondition(() -> {
if (hasException.get()) {
return true;
}
diff --git
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
index 9cc328dc6bc..72aba6b407f 100644
---
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
+++
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
@@ -20,11 +20,11 @@ import
org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
-import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.common.ProducerIdsBlock;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;