This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 41442903352 MINOR: Cleanup metadata module (#18937)
41442903352 is described below
commit 41442903352930c30db5569b7f9ba2a4e7897a11
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Mar 31 14:46:21 2025 +0500
MINOR: Cleanup metadata module (#18937)
Removed unused code and fixed IDEA warnings.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/server/metadata/ScramPublisher.scala | 2 +-
.../org/apache/kafka/controller/BrokersToIsrs.java | 4 -
.../kafka/controller/ClusterControlManager.java | 2 +-
.../kafka/controller/OffsetControlManager.java | 5 --
.../apache/kafka/controller/QuorumController.java | 10 ---
.../apache/kafka/controller/QuorumFeatures.java | 4 -
.../controller/ReplicationControlManager.java | 5 +-
.../controller/errors/ControllerExceptions.java | 15 ----
.../java/org/apache/kafka/image/ClusterImage.java | 4 -
.../java/org/apache/kafka/image/TopicDelta.java | 14 ----
.../kafka/image/publisher/SnapshotGenerator.java | 17 +----
.../org/apache/kafka/metadata/LeaderAndIsr.java | 21 -----
.../kafka/metadata/PartitionRegistration.java | 7 --
.../apache/kafka/metadata/ScramCredentialData.java | 2 +-
.../org/apache/kafka/metadata/VersionRange.java | 7 --
.../kafka/metadata/util/SnapshotFileReader.java | 4 -
.../ConfigurationControlManagerTest.java | 8 +-
.../controller/ProducerIdControlManagerTest.java | 3 +-
.../QuorumControllerIntegrationTestUtils.java | 4 -
.../kafka/controller/QuorumControllerTest.java | 14 ----
.../controller/ReplicationControlManagerTest.java | 16 ++--
.../image/publisher/SnapshotGeneratorTest.java | 28 +++----
.../org/apache/kafka/metadata/RecordTestUtils.java | 89 ----------------------
.../authorizer/StandardAuthorizerTest.java | 3 -
.../org/apache/kafka/metalog/LocalLogManager.java | 51 -------------
.../kafka/metalog/LocalLogManagerTestEnv.java | 20 -----
26 files changed, 35 insertions(+), 324 deletions(-)
diff --git a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
index 09789249571..818e01fa5f8 100644
--- a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
@@ -56,7 +56,7 @@ class ScramPublisher(
userChanges.forEach {
case (userName, change) =>
if (change.isPresent) {
- credentialProvider.updateCredential(mechanism, userName,
change.get().toCredential(mechanism))
+ credentialProvider.updateCredential(mechanism, userName,
change.get().toCredential)
} else {
credentialProvider.removeCredentials(mechanism, userName)
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index 24d3ed62b02..649fed99e1f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -278,10 +278,6 @@ public class BrokersToIsrs {
return iterator(NO_LEADER, true);
}
- PartitionsOnReplicaIterator partitionsLedByBroker(int brokerId) {
- return iterator(brokerId, true);
- }
-
PartitionsOnReplicaIterator partitionsWithBrokerInIsr(int brokerId) {
return iterator(brokerId, false);
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index dd50b45628a..c9064f991f0 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -521,7 +521,7 @@ public class ClusterControlManager {
* @param brokerId The broker id to track.
* @param brokerEpoch The broker epoch to track.
*
- * @returns True only if the ClusterControlManager is active.
+ * @return True only if the ClusterControlManager is active.
*/
boolean trackBrokerHeartbeat(int brokerId, long brokerEpoch) {
BrokerHeartbeatManager manager = heartbeatManager;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
index 99375af6141..5c496877747 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
@@ -418,9 +418,4 @@ class OffsetControlManager {
log.info("Replayed {} at offset {}. Reverted to offset {}.",
message, offset, preTransactionOffset);
}
-
- // VisibleForTesting
- void setNextWriteOffset(long newNextWriteOffset) {
- this.nextWriteOffset = newNextWriteOffset;
- }
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 56e3848ed32..bf2e26834ca 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -177,11 +177,6 @@ public final class QuorumController implements Controller {
*/
private static final int DEFAULT_MAX_RECORDS_PER_BATCH = 10000;
- /**
- * The default minimum event time that can be logged as a slow event.
- */
- private static final int DEFAULT_MIN_SLOW_EVENT_TIME_MS = 200;
-
/**
* The maximum records any user-initiated operation is allowed to generate.
*
@@ -677,11 +672,6 @@ public final class QuorumController implements Controller {
}
}
- // Visible for testing
- OffsetControlManager offsetControl() {
- return offsetControl;
- }
-
// Visible for testing
ReplicationControlManager replicationControl() {
return replicationControl;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 7817fdc3ee0..e54f0604242 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -84,10 +84,6 @@ public final class QuorumFeatures {
return nodeId;
}
- public Map<String, VersionRange> localSupportedFeatures() {
- return localSupportedFeatures;
- }
-
public List<Integer> quorumNodeIds() {
return quorumNodeIds;
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 0b2c8ed544b..22c8de053a0 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -2071,9 +2071,8 @@ public class ReplicationControlManager {
alterPartitionReassignment(topic.name(), partition,
records, allowRFChange);
successfulAlterations++;
} catch (Throwable e) {
- log.info("Unable to alter partition reassignment for " +
- topic.name() + ":" + partition.partitionIndex() + "
because " +
- "of an " + e.getClass().getSimpleName() + " error: " +
e.getMessage());
+ log.info("Unable to alter partition reassignment for {}:{}
because of an {} error: {}",
+ topic.name(), partition.partitionIndex(),
e.getClass().getSimpleName(), e.getMessage());
error = ApiError.fromThrowable(e);
}
totalAlterations++;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
b/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
index 3c742749383..15b2940784d 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
@@ -40,21 +40,6 @@ public class ControllerExceptions {
return exception instanceof TimeoutException;
}
- /**
- * Check if an exception is a NotController exception.
- *
- * @param exception The exception to check.
- * @return True if the exception is a NotController exception.
- */
- public static boolean isNotControllerException(Throwable exception) {
- if (exception == null) return false;
- if (exception instanceof ExecutionException) {
- exception = exception.getCause();
- if (exception == null) return false;
- }
- return exception instanceof NotControllerException;
- }
-
/**
* Create a new exception indicating that the controller is in
pre-migration mode, so the
* operation cannot be completed.
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
index 713e62fadc6..22166f1c2a3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
@@ -66,10 +66,6 @@ public final class ClusterImage {
return controllers;
}
- public boolean containsBroker(int brokerId) {
- return brokers.containsKey(brokerId);
- }
-
public long brokerEpoch(int brokerId) {
BrokerRegistration brokerRegistration = broker(brokerId);
if (brokerRegistration == null) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index 00e4a422341..fdb26e8e7df 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -129,20 +129,6 @@ public final class TopicDelta {
return new TopicImage(image.name(), image.id(), newPartitions);
}
- public boolean hasPartitionsWithAssignmentChanges() {
- for (Entry<Integer, PartitionRegistration> entry :
partitionChanges.entrySet()) {
- int partitionId = entry.getKey();
- // New Partition.
- if (!image.partitions().containsKey(partitionId))
- return true;
- PartitionRegistration previousPartition =
image.partitions().get(partitionId);
- PartitionRegistration currentPartition = entry.getValue();
- if (!previousPartition.hasSameAssignment(currentPartition))
- return true;
- }
- return false;
- }
-
/**
* Find the partitions that have change based on the replica given.
* <p>
diff --git
a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
index 501fb2ee4d3..a8483f8f8b3 100644
---
a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
+++
b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
@@ -23,7 +23,6 @@ import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LogDeltaManifest;
-import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.server.fault.FaultHandler;
@@ -217,28 +216,20 @@ public class SnapshotGenerator implements
MetadataPublisher {
) {
switch (manifest.type()) {
case LOG_DELTA:
- publishLogDelta(delta, newImage, (LogDeltaManifest) manifest);
+ publishLogDelta(newImage, (LogDeltaManifest) manifest);
break;
case SNAPSHOT:
- publishSnapshot(delta, newImage, (SnapshotManifest) manifest);
+ publishSnapshot(newImage);
break;
}
}
- void publishSnapshot(
- MetadataDelta delta,
- MetadataImage newImage,
- SnapshotManifest manifest
- ) {
+ void publishSnapshot(MetadataImage newImage) {
log.debug("Resetting the snapshot counters because we just read {}.",
newImage.provenance().snapshotName());
resetSnapshotCounters();
}
- void publishLogDelta(
- MetadataDelta delta,
- MetadataImage newImage,
- LogDeltaManifest manifest
- ) {
+ void publishLogDelta(MetadataImage newImage, LogDeltaManifest manifest) {
bytesSinceLastSnapshot += manifest.numBytes();
if (bytesSinceLastSnapshot >= maxBytesSinceLastSnapshot) {
if (eventQueue.isEmpty()) {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java
b/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java
index 615582870be..1a818dfca58 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java
@@ -26,9 +26,6 @@ public class LeaderAndIsr {
public static final int INITIAL_LEADER_EPOCH = 0;
public static final int INITIAL_PARTITION_EPOCH = 0;
public static final int NO_LEADER = -1;
- public static final int NO_EPOCH = -1;
- public static final int LEADER_DURING_DELETE = -2;
- public static final int EPOCH_DURING_DELETE = -2;
private final int leader;
private final int leaderEpoch;
@@ -74,10 +71,6 @@ public class LeaderAndIsr {
this.partitionEpoch = partitionEpoch;
}
- public static LeaderAndIsr duringDelete(List<Integer> isr) {
- return new LeaderAndIsr(LEADER_DURING_DELETE, isr);
- }
-
public int leader() {
return leader;
}
@@ -132,20 +125,6 @@ public class LeaderAndIsr {
.toList();
}
- public boolean equalsAllowStalePartitionEpoch(LeaderAndIsr other) {
- if (this == other) {
- return true;
- } else if (other == null) {
- return false;
- } else {
- return leader == other.leader &&
- leaderEpoch == other.leaderEpoch &&
- isrWithBrokerEpoch.equals(other.isrWithBrokerEpoch) &&
- leaderRecoveryState == other.leaderRecoveryState &&
- partitionEpoch <= other.partitionEpoch;
- }
- }
-
@Override
public String toString() {
return "LeaderAndIsr(" +
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 808c9809352..a8f9e166e1f 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -470,11 +470,4 @@ public class PartitionRegistration {
", partitionEpoch=" + partitionEpoch +
")";
}
-
- public boolean hasSameAssignment(PartitionRegistration registration) {
- return Arrays.equals(this.replicas, registration.replicas) &&
- Arrays.equals(this.directories, registration.directories) &&
- Arrays.equals(this.addingReplicas, registration.addingReplicas) &&
- Arrays.equals(this.removingReplicas,
registration.removingReplicas);
- }
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
index c356ff0f047..4339a572b71 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
@@ -86,7 +86,7 @@ public final class ScramCredentialData {
setIterations(iterations);
}
- public ScramCredential toCredential(ScramMechanism mechanism) {
+ public ScramCredential toCredential() {
return new ScramCredential(salt, storedKey, serverKey, iterations);
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
b/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
index b9ced14c488..47e0c74b850 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
@@ -56,13 +56,6 @@ public class VersionRange {
return version >= min && version <= max;
}
- /**
- * Check if a given version range has overlap with this one
- */
- public boolean intersects(VersionRange other) {
- return other.min <= max && other.max >= min;
- }
-
@Override
public int hashCode() {
return Objects.hash(min, max);
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index 2fddf190989..359cfe4534e 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -208,8 +208,4 @@ public final class SnapshotFileReader implements
AutoCloseable {
beginShutdown("closing");
queue.close();
}
-
- public CompletableFuture<Void> caughtUpFuture() {
- return caughtUpFuture;
- }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index cd2b3fa18c3..6954487c688 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -94,8 +94,6 @@ public class ConfigurationControlManagerTest {
static {
SYNONYMS.put("abc", List.of(new ConfigSynonym("foo.bar")));
SYNONYMS.put("def", List.of(new ConfigSynonym("baz")));
- SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
- List.of(new
ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
SYNONYMS.put("quuux", List.of(new ConfigSynonym("quux",
HOURS_TO_MILLISECONDS)));
SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, List.of(new
ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
}
@@ -130,7 +128,7 @@ public class ConfigurationControlManagerTest {
}
@Test
- public void testReplay() throws Exception {
+ public void testReplay() {
ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
setKafkaConfigSchema(SCHEMA).
build();
@@ -314,7 +312,7 @@ public class ConfigurationControlManagerTest {
}
@Override
- public void close() throws Exception {
+ public void close() {
// nothing to do
}
@@ -379,7 +377,7 @@ public class ConfigurationControlManagerTest {
}
@Override
- public void close() throws Exception {
+ public void close() {
// empty
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 95a1d3a3020..2fd589cdeed 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -154,12 +154,11 @@ public class ProducerIdControlManagerTest {
assertEquals(new ProducerIdsBlock(3, 100000, 1000),
producerIdControlManager.nextProducerBlock());
}
- static ProducerIdsBlock generateProducerIds(
+ static void generateProducerIds(
ProducerIdControlManager producerIdControlManager, int brokerId,
long brokerEpoch) {
ControllerResult<ProducerIdsBlock> result =
producerIdControlManager.generateNextProducerId(brokerId,
brokerEpoch);
result.records().forEach(apiMessageAndVersion ->
producerIdControlManager.replay((ProducerIdsRecord)
apiMessageAndVersion.message()));
- return result.response();
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
index 0157436ffb6..e37a9634a8d 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -53,10 +53,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class QuorumControllerIntegrationTestUtils {
private static final Logger log =
LoggerFactory.getLogger(QuorumControllerIntegrationTestUtils.class);
- BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
- return brokerFeatures(MetadataVersion.MINIMUM_VERSION,
MetadataVersion.latestTesting());
- }
-
/**
* Create a broker features collection for use in a registration request.
We only set MV. here.
*
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 4eac17d924d..d2265a846c7 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -97,7 +97,6 @@ import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
import
org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
-import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.metalog.LocalLogManager;
@@ -144,7 +143,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static java.util.function.Function.identity;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@@ -1409,11 +1407,6 @@ public class QuorumControllerTest {
}
}
- private static final Uuid FOO_ID =
Uuid.fromString("igRktLOnR8ektWHr79F8mw");
-
- private static final Map<Integer, Long> ALL_ZERO_BROKER_EPOCHS =
- IntStream.of(0, 1, 2, 3).boxed().collect(Collectors.toMap(identity(),
__ -> 0L));
-
@Test
public void testFatalMetadataReplayErrorOnActive() throws Throwable {
try (
@@ -1476,13 +1469,6 @@ public class QuorumControllerTest {
}
}
- private static void
assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
- for (int i = 0; i < authorizers.size(); i++) {
- assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
- "authorizer " + i + " should not have completed loading.");
- }
- }
-
static class InitialSnapshot implements AutoCloseable {
File tempDir;
BatchFileWriter writer;
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 239c48e2069..1b5bf4fb29d 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -3241,7 +3241,7 @@ public class ReplicationControlManagerTest {
Uuid topicB = ctx.createTestTopic("b", new int[][]{new int[]{1, 2},
new int[]{1, 2}}).topicId();
Uuid topicC = ctx.createTestTopic("c", new int[][]{new
int[]{2}}).topicId();
- ControllerResult<AssignReplicasToDirsResponseData> controllerResult =
ctx.assignReplicasToDirs(1, new HashMap<TopicIdPartition, Uuid>() {{
+ ControllerResult<AssignReplicasToDirsResponseData> controllerResult =
ctx.assignReplicasToDirs(1, new HashMap<>() {{
put(new TopicIdPartition(topicA, 0), dir1b1);
put(new TopicIdPartition(topicA, 1), dir2b1);
put(new TopicIdPartition(topicA, 2), offlineDir); //
unknown/offline dir
@@ -3252,21 +3252,21 @@ public class ReplicationControlManagerTest {
put(new TopicIdPartition(topicC, 0), dir1b1); // expect
NOT_LEADER_OR_FOLLOWER
}});
-
assertEquals(AssignmentsHelper.normalize(AssignmentsHelper.buildResponseData((short)
0, 0, new HashMap<Uuid, Map<TopicIdPartition, Errors>>() {{
- put(dir1b1, new HashMap<TopicIdPartition, Errors>() {{
+
assertEquals(AssignmentsHelper.normalize(AssignmentsHelper.buildResponseData((short)
0, 0, new HashMap<>() {{
+ put(dir1b1, new HashMap<>() {{
put(new TopicIdPartition(topicA, 0), NONE);
put(new TopicIdPartition(topicA, 137),
UNKNOWN_TOPIC_OR_PARTITION);
put(new TopicIdPartition(topicB, 0), NONE);
put(new TopicIdPartition(topicC, 0),
NOT_LEADER_OR_FOLLOWER);
}});
- put(dir2b1, new HashMap<TopicIdPartition, Errors>() {{
+ put(dir2b1, new HashMap<>() {{
put(new TopicIdPartition(topicA, 1), NONE);
put(new
TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1),
UNKNOWN_TOPIC_ID);
}});
- put(offlineDir, new HashMap<TopicIdPartition, Errors>() {{
+ put(offlineDir, new HashMap<>() {{
put(new TopicIdPartition(topicA, 2), NONE);
}});
- put(DirectoryId.LOST, new HashMap<TopicIdPartition, Errors>()
{{
+ put(DirectoryId.LOST, new HashMap<>() {{
put(new TopicIdPartition(topicB, 1), NONE);
}});
}})), AssignmentsHelper.normalize(controllerResult.response()));
@@ -3326,13 +3326,13 @@ public class ReplicationControlManagerTest {
ctx.unfenceBrokers(b1, b2);
Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{b1, b2},
new int[]{b1, b2}}).topicId();
Uuid topicB = ctx.createTestTopic("b", new int[][]{new int[]{b1, b2},
new int[]{b1, b2}}).topicId();
- ctx.assignReplicasToDirs(b1, new HashMap<TopicIdPartition, Uuid>() {{
+ ctx.assignReplicasToDirs(b1, new HashMap<>() {{
put(new TopicIdPartition(topicA, 0), dir1b1);
put(new TopicIdPartition(topicA, 1), dir2b1);
put(new TopicIdPartition(topicB, 0), dir1b1);
put(new TopicIdPartition(topicB, 1), dir2b1);
}});
- ctx.assignReplicasToDirs(b2, new HashMap<TopicIdPartition, Uuid>() {{
+ ctx.assignReplicasToDirs(b2, new HashMap<>() {{
put(new TopicIdPartition(topicA, 0), dir1b2);
put(new TopicIdPartition(topicA, 1), dir2b2);
put(new TopicIdPartition(topicB, 0), dir1b2);
diff --git
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
index db3c2e0ad16..a6b1bb78580 100644
---
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
@@ -117,12 +117,12 @@ public class SnapshotGeneratorTest {
setMaxTimeSinceLastSnapshotNs(TimeUnit.DAYS.toNanos(10)).
build()) {
// Publish a log delta batch. This one will not trigger a snapshot
yet.
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
logDeltaManifestBuilder().build());
+ generator.publishLogDelta(TEST_IMAGE,
logDeltaManifestBuilder().build());
// Publish a log delta batch. This will trigger a snapshot.
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
logDeltaManifestBuilder().build());
+ generator.publishLogDelta(TEST_IMAGE,
logDeltaManifestBuilder().build());
// Publish a log delta batch. This one will be ignored because
there are other images
// queued for writing.
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
logDeltaManifestBuilder().numBytes(2000).build());
+ generator.publishLogDelta(TEST_IMAGE,
logDeltaManifestBuilder().numBytes(2000).build());
assertEquals(List.of(), emitter.images());
emitter.setReady();
}
@@ -140,9 +140,9 @@ public class SnapshotGeneratorTest {
setMaxTimeSinceLastSnapshotNs(TimeUnit.DAYS.toNanos(10)).
build()) {
// None of these log delta batches should trigger a snapshot since
their offset is not batch aligned.
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
notBatchAlignedLogDeltaManifestBuilder().build());
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
notBatchAlignedLogDeltaManifestBuilder().build());
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
notBatchAlignedLogDeltaManifestBuilder().build());
+ generator.publishLogDelta(TEST_IMAGE,
notBatchAlignedLogDeltaManifestBuilder().build());
+ generator.publishLogDelta(TEST_IMAGE,
notBatchAlignedLogDeltaManifestBuilder().build());
+ generator.publishLogDelta(TEST_IMAGE,
notBatchAlignedLogDeltaManifestBuilder().build());
assertEquals(List.of(), emitter.images());
emitter.setReady();
}
@@ -162,10 +162,10 @@ public class SnapshotGeneratorTest {
setMaxTimeSinceLastSnapshotNs(TimeUnit.DAYS.toNanos(10)).
build()) {
// These should not be published despite meeting the max bytes
threshold since they are not batch aligned.
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
notBatchAlignedLogDeltaManifestBuilder().build());
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
notBatchAlignedLogDeltaManifestBuilder().build());
+ generator.publishLogDelta(TEST_IMAGE,
notBatchAlignedLogDeltaManifestBuilder().build());
+ generator.publishLogDelta(TEST_IMAGE,
notBatchAlignedLogDeltaManifestBuilder().build());
// This snapshot should get published since it is batch aligned.
- generator.publishLogDelta(TEST_DELTA, batchAlignedImage,
logDeltaManifestBuilder().build());
+ generator.publishLogDelta(batchAlignedImage,
logDeltaManifestBuilder().build());
assertEquals(List.of(), emitter.images());
emitter.setReady();
}
@@ -186,7 +186,7 @@ public class SnapshotGeneratorTest {
build()) {
disabledReason.compareAndSet(null, "we are testing disable()");
// No snapshots are generated because snapshots are disabled.
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
logDeltaManifestBuilder().build());
+ generator.publishLogDelta(TEST_IMAGE,
logDeltaManifestBuilder().build());
}
assertEquals(List.of(), emitter.images());
faultHandler.maybeRethrowFirstException();
@@ -204,15 +204,15 @@ public class SnapshotGeneratorTest {
setMaxTimeSinceLastSnapshotNs(TimeUnit.MINUTES.toNanos(30)).
build()) {
// This image isn't published yet.
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
logDeltaManifestBuilder().numBytes(50).build());
+ generator.publishLogDelta(TEST_IMAGE,
logDeltaManifestBuilder().numBytes(50).build());
assertEquals(List.of(), emitter.images());
mockTime.sleep(TimeUnit.MINUTES.toNanos(40));
// Next image is published because of the time delay.
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
logDeltaManifestBuilder().numBytes(50).build());
+ generator.publishLogDelta(TEST_IMAGE,
logDeltaManifestBuilder().numBytes(50).build());
TestUtils.waitForCondition(() -> emitter.images().size() == 1,
"images.size == 1");
// bytesSinceLastSnapshot was reset to 0 by the previous snapshot,
// so this does not trigger a new snapshot.
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
logDeltaManifestBuilder().numBytes(150).build());
+ generator.publishLogDelta(TEST_IMAGE,
logDeltaManifestBuilder().numBytes(150).build());
}
assertEquals(List.of(TEST_IMAGE), emitter.images());
faultHandler.maybeRethrowFirstException();
@@ -227,7 +227,7 @@ public class SnapshotGeneratorTest {
setMaxBytesSinceLastSnapshot(200).
build()) {
for (int i = 0; i < 2; i++) {
- generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
+ generator.publishLogDelta(TEST_IMAGE,
logDeltaManifestBuilder().elapsedNs(10000).numBytes(50000).build());
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index 21bd04e2643..c022ae79893 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -22,12 +22,8 @@ import
org.apache.kafka.common.metadata.RegisterControllerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Message;
-import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
-import org.apache.kafka.raft.Batch;
-import org.apache.kafka.raft.BatchReader;
-import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.MockRandom;
@@ -35,7 +31,6 @@ import org.apache.kafka.server.util.MockRandom;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
@@ -47,7 +42,6 @@ import java.util.function.Function;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -86,13 +80,6 @@ public class RecordTestUtils {
}
}
- public static void replayOne(
- Object target,
- ApiMessageAndVersion recordAndVersion
- ) {
- replayAll(target, List.of(recordAndVersion));
- }
-
public static <T extends ApiMessage> Optional<T> recordAtIndexAs(
Class<T> recordClazz,
List<ApiMessageAndVersion> recordsAndVersions,
@@ -217,19 +204,6 @@ public class RecordTestUtils {
}
}
- /**
- * Replay a list of record batches.
- *
- * @param target The object to invoke the replay function on.
- * @param batches A list of batches of records.
- */
- public static void replayAllBatches(Object target,
- List<List<ApiMessageAndVersion>>
batches) {
- for (List<ApiMessageAndVersion> batch : batches) {
- replayAll(target, batch);
- }
- }
-
/**
* Materialize the output of an iterator into a set.
*
@@ -245,27 +219,6 @@ public class RecordTestUtils {
return set;
}
- /**
- * Assert that a batch iterator yields a given set of record batches.
- *
- * @param batches A list of record batches.
- * @param iterator The input iterator.
- */
- public static void
assertBatchIteratorContains(List<List<ApiMessageAndVersion>> batches,
-
Iterator<List<ApiMessageAndVersion>> iterator) throws Exception {
- List<List<ApiMessageAndVersion>> actual = new ArrayList<>();
- while (iterator.hasNext()) {
- actual.add(new ArrayList<>(iterator.next()));
- }
- deepSortRecords(actual);
- List<List<ApiMessageAndVersion>> expected = new ArrayList<>();
- for (List<ApiMessageAndVersion> batch : batches) {
- expected.add(new ArrayList<>(batch));
- }
- deepSortRecords(expected);
- assertEquals(expected, actual);
- }
-
/**
* Sort the contents of an object which contains records.
*
@@ -297,48 +250,6 @@ public class RecordTestUtils {
}
}
- /**
- * Create a batch reader for testing.
- *
- * @param lastOffset the last offset of the given list of records
- * @param appendTimestamp the append timestamp for the batches created
- * @param records the records
- * @return a batch reader which will return the given records
- */
- public static BatchReader<ApiMessageAndVersion> mockBatchReader(
- long lastOffset,
- long appendTimestamp,
- List<ApiMessageAndVersion> records
- ) {
- List<Batch<ApiMessageAndVersion>> batches = new ArrayList<>();
- long offset = lastOffset - records.size() + 1;
- Iterator<ApiMessageAndVersion> iterator = records.iterator();
- List<ApiMessageAndVersion> curRecords = new ArrayList<>();
- assertTrue(iterator.hasNext()); // At least one record is required
- while (true) {
- if (!iterator.hasNext() || curRecords.size() >= 2) {
- batches.add(Batch.data(offset, 0, appendTimestamp,
sizeInBytes(curRecords), curRecords));
- if (!iterator.hasNext()) {
- break;
- }
- offset += curRecords.size();
- curRecords = new ArrayList<>();
- }
- curRecords.add(iterator.next());
- }
- return MemoryBatchReader.of(batches, __ -> { });
- }
-
-
- private static int sizeInBytes(List<ApiMessageAndVersion> records) {
- int size = 0;
- for (ApiMessageAndVersion record : records) {
- ObjectSerializationCache cache = new ObjectSerializationCache();
- size += MetadataRecordSerde.INSTANCE.recordSize(record, cache);
- }
- return size;
- }
-
public static ApiMessageAndVersion testRecord(int index) {
MockRandom random = new MockRandom(index);
return new ApiMessageAndVersion(
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index 834aa65cb69..869fdd159bc 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -56,7 +56,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
-import java.util.concurrent.atomic.AtomicLong;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
@@ -252,8 +251,6 @@ public class StandardAuthorizerTest {
return authorizer;
}
- private static final AtomicLong NEXT_ID = new AtomicLong(0);
-
static StandardAcl newFooAcl(AclOperation op, AclPermissionType
permission) {
return new StandardAcl(
TOPIC,
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 4ebec345b65..8e403b9c9e1 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -64,7 +64,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.IntStream;
/**
* The LocalLogManager is a test implementation that relies on the contents of
memory.
@@ -323,35 +322,6 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
}
}
- /**
- * Returns the snapshot whose last offset is the committed offset.
- *
- * If such snapshot doesn't exist, it waits until it does.
- */
- synchronized RawSnapshotReader waitForSnapshot(long committedOffset)
throws InterruptedException {
- while (true) {
- RawSnapshotReader reader = snapshots.get(committedOffset);
- if (reader != null) {
- return reader;
- } else {
- this.wait();
- }
- }
- }
-
- /**
- * Returns the latest snapshot.
- *
- * If a snapshot doesn't exists, it waits until it does.
- */
- synchronized RawSnapshotReader waitForLatestSnapshot() throws
InterruptedException {
- while (snapshots.isEmpty()) {
- this.wait();
- }
-
- return Objects.requireNonNull(snapshots.lastEntry()).getValue();
- }
-
/**
* Returns the snapshot id of the latest snapshot if there is one.
*
@@ -361,27 +331,6 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
return Optional.ofNullable(snapshots.lastEntry()).map(entry ->
entry.getValue().snapshotId());
}
- synchronized long appendedBytes() {
- ObjectSerializationCache objectCache = new
ObjectSerializationCache();
-
- return batches
- .values()
- .stream()
- .flatMapToInt(batch -> {
- if (batch instanceof LocalRecordBatch localBatch) {
- return localBatch.records.stream().mapToInt(record ->
messageSize(record, objectCache));
- } else {
- return IntStream.empty();
- }
- })
- .sum();
- }
-
- public SharedLogData setInitialMaxReadOffset(long
initialMaxReadOffset) {
- this.initialMaxReadOffset = initialMaxReadOffset;
- return this;
- }
-
public long initialMaxReadOffset() {
return initialMaxReadOffset;
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index 661c96a3a8b..a1e6742f8ef 100644
---
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -175,14 +175,6 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
return clusterId;
}
- AtomicReference<String> firstError() {
- return firstError;
- }
-
- File dir() {
- return dir;
- }
-
LeaderAndEpoch waitForLeader() throws InterruptedException {
AtomicReference<LeaderAndEpoch> value = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
@@ -219,18 +211,6 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
}
}
- public RawSnapshotReader waitForSnapshot(long committedOffset) throws
InterruptedException {
- return shared.waitForSnapshot(committedOffset);
- }
-
- public RawSnapshotReader waitForLatestSnapshot() throws
InterruptedException {
- return shared.waitForLatestSnapshot();
- }
-
- public long appendedBytes() {
- return shared.appendedBytes();
- }
-
public LeaderAndEpoch leaderAndEpoch() {
return shared.leaderAndEpoch();
}