This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 41442903352 MINOR: Cleanup metadata module (#18937)
41442903352 is described below

commit 41442903352930c30db5569b7f9ba2a4e7897a11
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Mar 31 14:46:21 2025 +0500

    MINOR: Cleanup metadata module (#18937)
    
    Removed unused code and fixed IDEA warnings.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/server/metadata/ScramPublisher.scala     |  2 +-
 .../org/apache/kafka/controller/BrokersToIsrs.java |  4 -
 .../kafka/controller/ClusterControlManager.java    |  2 +-
 .../kafka/controller/OffsetControlManager.java     |  5 --
 .../apache/kafka/controller/QuorumController.java  | 10 ---
 .../apache/kafka/controller/QuorumFeatures.java    |  4 -
 .../controller/ReplicationControlManager.java      |  5 +-
 .../controller/errors/ControllerExceptions.java    | 15 ----
 .../java/org/apache/kafka/image/ClusterImage.java  |  4 -
 .../java/org/apache/kafka/image/TopicDelta.java    | 14 ----
 .../kafka/image/publisher/SnapshotGenerator.java   | 17 +----
 .../org/apache/kafka/metadata/LeaderAndIsr.java    | 21 -----
 .../kafka/metadata/PartitionRegistration.java      |  7 --
 .../apache/kafka/metadata/ScramCredentialData.java |  2 +-
 .../org/apache/kafka/metadata/VersionRange.java    |  7 --
 .../kafka/metadata/util/SnapshotFileReader.java    |  4 -
 .../ConfigurationControlManagerTest.java           |  8 +-
 .../controller/ProducerIdControlManagerTest.java   |  3 +-
 .../QuorumControllerIntegrationTestUtils.java      |  4 -
 .../kafka/controller/QuorumControllerTest.java     | 14 ----
 .../controller/ReplicationControlManagerTest.java  | 16 ++--
 .../image/publisher/SnapshotGeneratorTest.java     | 28 +++----
 .../org/apache/kafka/metadata/RecordTestUtils.java | 89 ----------------------
 .../authorizer/StandardAuthorizerTest.java         |  3 -
 .../org/apache/kafka/metalog/LocalLogManager.java  | 51 -------------
 .../kafka/metalog/LocalLogManagerTestEnv.java      | 20 -----
 26 files changed, 35 insertions(+), 324 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
index 09789249571..818e01fa5f8 100644
--- a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
@@ -56,7 +56,7 @@ class ScramPublisher(
             userChanges.forEach {
               case (userName, change) =>
                 if (change.isPresent) {
-                  credentialProvider.updateCredential(mechanism, userName, 
change.get().toCredential(mechanism))
+                  credentialProvider.updateCredential(mechanism, userName, 
change.get().toCredential)
                 } else {
                   credentialProvider.removeCredentials(mechanism, userName)
                 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java 
b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index 24d3ed62b02..649fed99e1f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -278,10 +278,6 @@ public class BrokersToIsrs {
         return iterator(NO_LEADER, true);
     }
 
-    PartitionsOnReplicaIterator partitionsLedByBroker(int brokerId) {
-        return iterator(brokerId, true);
-    }
-
     PartitionsOnReplicaIterator partitionsWithBrokerInIsr(int brokerId) {
         return iterator(brokerId, false);
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index dd50b45628a..c9064f991f0 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -521,7 +521,7 @@ public class ClusterControlManager {
      * @param brokerId      The broker id to track.
      * @param brokerEpoch   The broker epoch to track.
      *
-     * @returns             True only if the ClusterControlManager is active.
+     * @return              True only if the ClusterControlManager is active.
      */
     boolean trackBrokerHeartbeat(int brokerId, long brokerEpoch) {
         BrokerHeartbeatManager manager = heartbeatManager;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
index 99375af6141..5c496877747 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
@@ -418,9 +418,4 @@ class OffsetControlManager {
         log.info("Replayed {} at offset {}. Reverted to offset {}.",
                 message, offset, preTransactionOffset);
     }
-
-    // VisibleForTesting
-    void setNextWriteOffset(long newNextWriteOffset) {
-        this.nextWriteOffset = newNextWriteOffset;
-    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 56e3848ed32..bf2e26834ca 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -177,11 +177,6 @@ public final class QuorumController implements Controller {
      */
     private static final int DEFAULT_MAX_RECORDS_PER_BATCH = 10000;
 
-    /**
-     * The default minimum event time that can be logged as a slow event.
-     */
-    private static final int DEFAULT_MIN_SLOW_EVENT_TIME_MS = 200;
-
     /**
      * The maximum records any user-initiated operation is allowed to generate.
      *
@@ -677,11 +672,6 @@ public final class QuorumController implements Controller {
         }
     }
 
-    // Visible for testing
-    OffsetControlManager offsetControl() {
-        return offsetControl;
-    }
-
     // Visible for testing
     ReplicationControlManager replicationControl() {
         return replicationControl;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 7817fdc3ee0..e54f0604242 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -84,10 +84,6 @@ public final class QuorumFeatures {
         return nodeId;
     }
 
-    public Map<String, VersionRange> localSupportedFeatures() {
-        return localSupportedFeatures;
-    }
-
     public List<Integer> quorumNodeIds() {
         return quorumNodeIds;
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 0b2c8ed544b..22c8de053a0 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -2071,9 +2071,8 @@ public class ReplicationControlManager {
                     alterPartitionReassignment(topic.name(), partition, 
records, allowRFChange);
                     successfulAlterations++;
                 } catch (Throwable e) {
-                    log.info("Unable to alter partition reassignment for " +
-                        topic.name() + ":" + partition.partitionIndex() + " 
because " +
-                        "of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+                    log.info("Unable to alter partition reassignment for {}:{} 
because of an {} error: {}",
+                            topic.name(), partition.partitionIndex(), 
e.getClass().getSimpleName(), e.getMessage());
                     error = ApiError.fromThrowable(e);
                 }
                 totalAlterations++;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
 
b/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
index 3c742749383..15b2940784d 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
@@ -40,21 +40,6 @@ public class ControllerExceptions {
         return exception instanceof TimeoutException;
     }
 
-    /**
-     * Check if an exception is a NotController exception.
-     *
-     * @param exception     The exception to check.
-     * @return              True if the exception is a NotController exception.
-     */
-    public static boolean isNotControllerException(Throwable exception) {
-        if (exception == null) return false;
-        if (exception instanceof ExecutionException) {
-            exception = exception.getCause();
-            if (exception == null) return false;
-        }
-        return exception instanceof NotControllerException;
-    }
-
     /**
      * Create a new exception indicating that the controller is in 
pre-migration mode, so the
      * operation cannot be completed.
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
index 713e62fadc6..22166f1c2a3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
@@ -66,10 +66,6 @@ public final class ClusterImage {
         return controllers;
     }
 
-    public boolean containsBroker(int brokerId) {
-        return brokers.containsKey(brokerId);
-    }
-
     public long brokerEpoch(int brokerId) {
         BrokerRegistration brokerRegistration = broker(brokerId);
         if (brokerRegistration == null) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index 00e4a422341..fdb26e8e7df 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -129,20 +129,6 @@ public final class TopicDelta {
         return new TopicImage(image.name(), image.id(), newPartitions);
     }
 
-    public boolean hasPartitionsWithAssignmentChanges() {
-        for (Entry<Integer, PartitionRegistration> entry : 
partitionChanges.entrySet()) {
-            int partitionId = entry.getKey();
-            // New Partition.
-            if (!image.partitions().containsKey(partitionId))
-                return true;
-            PartitionRegistration previousPartition = 
image.partitions().get(partitionId);
-            PartitionRegistration currentPartition = entry.getValue();
-            if (!previousPartition.hasSameAssignment(currentPartition))
-                return true;
-        }
-        return false;
-    }
-
     /**
      * Find the partitions that have change based on the replica given.
      * <p>
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
 
b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
index 501fb2ee4d3..a8483f8f8b3 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
@@ -23,7 +23,6 @@ import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.loader.LoaderManifest;
 import org.apache.kafka.image.loader.LogDeltaManifest;
-import org.apache.kafka.image.loader.SnapshotManifest;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.server.fault.FaultHandler;
@@ -217,28 +216,20 @@ public class SnapshotGenerator implements 
MetadataPublisher {
     ) {
         switch (manifest.type()) {
             case LOG_DELTA:
-                publishLogDelta(delta, newImage, (LogDeltaManifest) manifest);
+                publishLogDelta(newImage, (LogDeltaManifest) manifest);
                 break;
             case SNAPSHOT:
-                publishSnapshot(delta, newImage, (SnapshotManifest) manifest);
+                publishSnapshot(newImage);
                 break;
         }
     }
 
-    void publishSnapshot(
-        MetadataDelta delta,
-        MetadataImage newImage,
-        SnapshotManifest manifest
-    ) {
+    void publishSnapshot(MetadataImage newImage) {
         log.debug("Resetting the snapshot counters because we just read {}.", 
newImage.provenance().snapshotName());
         resetSnapshotCounters();
     }
 
-    void publishLogDelta(
-        MetadataDelta delta,
-        MetadataImage newImage,
-        LogDeltaManifest manifest
-    ) {
+    void publishLogDelta(MetadataImage newImage, LogDeltaManifest manifest) {
         bytesSinceLastSnapshot += manifest.numBytes();
         if (bytesSinceLastSnapshot >= maxBytesSinceLastSnapshot) {
             if (eventQueue.isEmpty()) {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java 
b/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java
index 615582870be..1a818dfca58 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java
@@ -26,9 +26,6 @@ public class LeaderAndIsr {
     public static final int INITIAL_LEADER_EPOCH = 0;
     public static final int INITIAL_PARTITION_EPOCH = 0;
     public static final int NO_LEADER = -1;
-    public static final int NO_EPOCH = -1;
-    public static final int LEADER_DURING_DELETE = -2;
-    public static final int EPOCH_DURING_DELETE = -2;
 
     private final int leader;
     private final int leaderEpoch;
@@ -74,10 +71,6 @@ public class LeaderAndIsr {
         this.partitionEpoch = partitionEpoch;
     }
 
-    public static LeaderAndIsr duringDelete(List<Integer> isr) {
-        return new LeaderAndIsr(LEADER_DURING_DELETE, isr);
-    }
-
     public int leader() {
         return leader;
     }
@@ -132,20 +125,6 @@ public class LeaderAndIsr {
                 .toList();
     }
 
-    public boolean equalsAllowStalePartitionEpoch(LeaderAndIsr other) {
-        if (this == other) {
-            return true;
-        } else if (other == null) {
-            return false;
-        } else {
-            return leader == other.leader &&
-                    leaderEpoch == other.leaderEpoch &&
-                    isrWithBrokerEpoch.equals(other.isrWithBrokerEpoch) &&
-                    leaderRecoveryState == other.leaderRecoveryState &&
-                    partitionEpoch <= other.partitionEpoch;
-        }
-    }
-
     @Override
     public String toString() {
         return "LeaderAndIsr(" +
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 808c9809352..a8f9e166e1f 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -470,11 +470,4 @@ public class PartitionRegistration {
                 ", partitionEpoch=" + partitionEpoch +
                 ")";
     }
-
-    public boolean hasSameAssignment(PartitionRegistration registration) {
-        return Arrays.equals(this.replicas, registration.replicas) &&
-            Arrays.equals(this.directories, registration.directories) &&
-            Arrays.equals(this.addingReplicas, registration.addingReplicas) &&
-            Arrays.equals(this.removingReplicas, 
registration.removingReplicas);
-    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java 
b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
index c356ff0f047..4339a572b71 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
@@ -86,7 +86,7 @@ public final class ScramCredentialData {
                 setIterations(iterations);
     }
 
-    public ScramCredential toCredential(ScramMechanism mechanism) {
+    public ScramCredential toCredential() {
         return new ScramCredential(salt, storedKey, serverKey, iterations);
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java 
b/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
index b9ced14c488..47e0c74b850 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
@@ -56,13 +56,6 @@ public class VersionRange {
         return version >= min && version <= max;
     }
 
-    /**
-     * Check if a given version range has overlap with this one
-     */
-    public boolean intersects(VersionRange other) {
-        return other.min <= max && other.max >= min;
-    }
-
     @Override
     public int hashCode() {
         return Objects.hash(min, max);
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index 2fddf190989..359cfe4534e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -208,8 +208,4 @@ public final class SnapshotFileReader implements 
AutoCloseable {
         beginShutdown("closing");
         queue.close();
     }
-
-    public CompletableFuture<Void> caughtUpFuture() {
-        return caughtUpFuture;
-    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index cd2b3fa18c3..6954487c688 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -94,8 +94,6 @@ public class ConfigurationControlManagerTest {
     static {
         SYNONYMS.put("abc", List.of(new ConfigSynonym("foo.bar")));
         SYNONYMS.put("def", List.of(new ConfigSynonym("baz")));
-        SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
-            List.of(new 
ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
         SYNONYMS.put("quuux", List.of(new ConfigSynonym("quux", 
HOURS_TO_MILLISECONDS)));
         SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, List.of(new 
ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
     }
@@ -130,7 +128,7 @@ public class ConfigurationControlManagerTest {
     }
 
     @Test
-    public void testReplay() throws Exception {
+    public void testReplay() {
         ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
             setKafkaConfigSchema(SCHEMA).
             build();
@@ -314,7 +312,7 @@ public class ConfigurationControlManagerTest {
         }
 
         @Override
-        public void close() throws Exception {
+        public void close() {
             // nothing to do
         }
 
@@ -379,7 +377,7 @@ public class ConfigurationControlManagerTest {
         }
 
         @Override
-        public void close() throws Exception {
+        public void close() {
             // empty
         }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 95a1d3a3020..2fd589cdeed 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -154,12 +154,11 @@ public class ProducerIdControlManagerTest {
         assertEquals(new ProducerIdsBlock(3, 100000, 1000), 
producerIdControlManager.nextProducerBlock());
     }
 
-    static ProducerIdsBlock generateProducerIds(
+    static void generateProducerIds(
             ProducerIdControlManager producerIdControlManager, int brokerId, 
long brokerEpoch) {
         ControllerResult<ProducerIdsBlock> result =
             producerIdControlManager.generateNextProducerId(brokerId, 
brokerEpoch);
         result.records().forEach(apiMessageAndVersion ->
             producerIdControlManager.replay((ProducerIdsRecord) 
apiMessageAndVersion.message()));
-        return result.response();
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
index 0157436ffb6..e37a9634a8d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -53,10 +53,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class QuorumControllerIntegrationTestUtils {
     private static final Logger log = 
LoggerFactory.getLogger(QuorumControllerIntegrationTestUtils.class);
 
-    BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
-        return brokerFeatures(MetadataVersion.MINIMUM_VERSION, 
MetadataVersion.latestTesting());
-    }
-
     /**
      * Create a broker features collection for use in a registration request. 
We only set MV. here.
      *
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 4eac17d924d..d2265a846c7 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -97,7 +97,6 @@ import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
 import 
org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
-import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.util.BatchFileWriter;
 import org.apache.kafka.metalog.LocalLogManager;
@@ -144,7 +143,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static java.util.function.Function.identity;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@@ -1409,11 +1407,6 @@ public class QuorumControllerTest {
         }
     }
 
-    private static final Uuid FOO_ID = 
Uuid.fromString("igRktLOnR8ektWHr79F8mw");
-
-    private static final Map<Integer, Long> ALL_ZERO_BROKER_EPOCHS =
-        IntStream.of(0, 1, 2, 3).boxed().collect(Collectors.toMap(identity(), 
__ -> 0L));
-
     @Test
     public void testFatalMetadataReplayErrorOnActive() throws Throwable {
         try (
@@ -1476,13 +1469,6 @@ public class QuorumControllerTest {
         }
     }
 
-    private static void 
assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
-        for (int i = 0; i < authorizers.size(); i++) {
-            assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
-                "authorizer " + i + " should not have completed loading.");
-        }
-    }
-
     static class InitialSnapshot implements AutoCloseable {
         File tempDir;
         BatchFileWriter writer;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 239c48e2069..1b5bf4fb29d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -3241,7 +3241,7 @@ public class ReplicationControlManagerTest {
         Uuid topicB = ctx.createTestTopic("b", new int[][]{new int[]{1, 2}, 
new int[]{1, 2}}).topicId();
         Uuid topicC = ctx.createTestTopic("c", new int[][]{new 
int[]{2}}).topicId();
 
-        ControllerResult<AssignReplicasToDirsResponseData> controllerResult = 
ctx.assignReplicasToDirs(1, new HashMap<TopicIdPartition, Uuid>() {{
+        ControllerResult<AssignReplicasToDirsResponseData> controllerResult = 
ctx.assignReplicasToDirs(1, new HashMap<>() {{
                 put(new TopicIdPartition(topicA, 0), dir1b1);
                 put(new TopicIdPartition(topicA, 1), dir2b1);
                 put(new TopicIdPartition(topicA, 2), offlineDir); // 
unknown/offline dir
@@ -3252,21 +3252,21 @@ public class ReplicationControlManagerTest {
                 put(new TopicIdPartition(topicC, 0), dir1b1); // expect 
NOT_LEADER_OR_FOLLOWER
             }});
 
-        
assertEquals(AssignmentsHelper.normalize(AssignmentsHelper.buildResponseData((short)
 0, 0, new HashMap<Uuid, Map<TopicIdPartition, Errors>>() {{
-                put(dir1b1, new HashMap<TopicIdPartition, Errors>() {{
+        
assertEquals(AssignmentsHelper.normalize(AssignmentsHelper.buildResponseData((short)
 0, 0, new HashMap<>() {{
+                put(dir1b1, new HashMap<>() {{
                         put(new TopicIdPartition(topicA, 0), NONE);
                         put(new TopicIdPartition(topicA, 137), 
UNKNOWN_TOPIC_OR_PARTITION);
                         put(new TopicIdPartition(topicB, 0), NONE);
                         put(new TopicIdPartition(topicC, 0), 
NOT_LEADER_OR_FOLLOWER);
                     }});
-                put(dir2b1, new HashMap<TopicIdPartition, Errors>() {{
+                put(dir2b1, new HashMap<>() {{
                         put(new TopicIdPartition(topicA, 1), NONE);
                         put(new 
TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), 
UNKNOWN_TOPIC_ID);
                     }});
-                put(offlineDir, new HashMap<TopicIdPartition, Errors>() {{
+                put(offlineDir, new HashMap<>() {{
                         put(new TopicIdPartition(topicA, 2), NONE);
                     }});
-                put(DirectoryId.LOST, new HashMap<TopicIdPartition, Errors>() 
{{
+                put(DirectoryId.LOST, new HashMap<>() {{
                         put(new TopicIdPartition(topicB, 1), NONE);
                     }});
             }})), AssignmentsHelper.normalize(controllerResult.response()));
@@ -3326,13 +3326,13 @@ public class ReplicationControlManagerTest {
         ctx.unfenceBrokers(b1, b2);
         Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{b1, b2}, 
new int[]{b1, b2}}).topicId();
         Uuid topicB = ctx.createTestTopic("b", new int[][]{new int[]{b1, b2}, 
new int[]{b1, b2}}).topicId();
-        ctx.assignReplicasToDirs(b1, new HashMap<TopicIdPartition, Uuid>() {{
+        ctx.assignReplicasToDirs(b1, new HashMap<>() {{
                 put(new TopicIdPartition(topicA, 0), dir1b1);
                 put(new TopicIdPartition(topicA, 1), dir2b1);
                 put(new TopicIdPartition(topicB, 0), dir1b1);
                 put(new TopicIdPartition(topicB, 1), dir2b1);
             }});
-        ctx.assignReplicasToDirs(b2, new HashMap<TopicIdPartition, Uuid>() {{
+        ctx.assignReplicasToDirs(b2, new HashMap<>() {{
                 put(new TopicIdPartition(topicA, 0), dir1b2);
                 put(new TopicIdPartition(topicA, 1), dir2b2);
                 put(new TopicIdPartition(topicB, 0), dir1b2);
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
index db3c2e0ad16..a6b1bb78580 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
@@ -117,12 +117,12 @@ public class SnapshotGeneratorTest {
                 setMaxTimeSinceLastSnapshotNs(TimeUnit.DAYS.toNanos(10)).
                 build()) {
             // Publish a log delta batch. This one will not trigger a snapshot 
yet.
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
logDeltaManifestBuilder().build());
+            generator.publishLogDelta(TEST_IMAGE, 
logDeltaManifestBuilder().build());
             // Publish a log delta batch. This will trigger a snapshot.
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
logDeltaManifestBuilder().build());
+            generator.publishLogDelta(TEST_IMAGE, 
logDeltaManifestBuilder().build());
             // Publish a log delta batch. This one will be ignored because 
there are other images
             // queued for writing.
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
logDeltaManifestBuilder().numBytes(2000).build());
+            generator.publishLogDelta(TEST_IMAGE, 
logDeltaManifestBuilder().numBytes(2000).build());
             assertEquals(List.of(), emitter.images());
             emitter.setReady();
         }
@@ -140,9 +140,9 @@ public class SnapshotGeneratorTest {
                 setMaxTimeSinceLastSnapshotNs(TimeUnit.DAYS.toNanos(10)).
                 build()) {
             // None of these log delta batches should trigger a snapshot since 
their offset is not batch aligned.
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
notBatchAlignedLogDeltaManifestBuilder().build());
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
notBatchAlignedLogDeltaManifestBuilder().build());
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
notBatchAlignedLogDeltaManifestBuilder().build());
+            generator.publishLogDelta(TEST_IMAGE, 
notBatchAlignedLogDeltaManifestBuilder().build());
+            generator.publishLogDelta(TEST_IMAGE, 
notBatchAlignedLogDeltaManifestBuilder().build());
+            generator.publishLogDelta(TEST_IMAGE, 
notBatchAlignedLogDeltaManifestBuilder().build());
             assertEquals(List.of(), emitter.images());
             emitter.setReady();
         }
@@ -162,10 +162,10 @@ public class SnapshotGeneratorTest {
                 setMaxTimeSinceLastSnapshotNs(TimeUnit.DAYS.toNanos(10)).
                 build()) {
             // These should not be published despite meeting the max bytes 
threshold since they are not batch aligned.
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
notBatchAlignedLogDeltaManifestBuilder().build());
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
notBatchAlignedLogDeltaManifestBuilder().build());
+            generator.publishLogDelta(TEST_IMAGE, 
notBatchAlignedLogDeltaManifestBuilder().build());
+            generator.publishLogDelta(TEST_IMAGE, 
notBatchAlignedLogDeltaManifestBuilder().build());
             // This snapshot should get published since it is batch aligned.
-            generator.publishLogDelta(TEST_DELTA, batchAlignedImage, 
logDeltaManifestBuilder().build());
+            generator.publishLogDelta(batchAlignedImage, 
logDeltaManifestBuilder().build());
             assertEquals(List.of(), emitter.images());
             emitter.setReady();
         }
@@ -186,7 +186,7 @@ public class SnapshotGeneratorTest {
                 build()) {
             disabledReason.compareAndSet(null, "we are testing disable()");
             // No snapshots are generated because snapshots are disabled.
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
logDeltaManifestBuilder().build());
+            generator.publishLogDelta(TEST_IMAGE, 
logDeltaManifestBuilder().build());
         }
         assertEquals(List.of(), emitter.images());
         faultHandler.maybeRethrowFirstException();
@@ -204,15 +204,15 @@ public class SnapshotGeneratorTest {
                 setMaxTimeSinceLastSnapshotNs(TimeUnit.MINUTES.toNanos(30)).
                 build()) {
             // This image isn't published yet.
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
logDeltaManifestBuilder().numBytes(50).build());
+            generator.publishLogDelta(TEST_IMAGE, 
logDeltaManifestBuilder().numBytes(50).build());
             assertEquals(List.of(), emitter.images());
             mockTime.sleep(TimeUnit.MINUTES.toNanos(40));
             // Next image is published because of the time delay.
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
logDeltaManifestBuilder().numBytes(50).build());
+            generator.publishLogDelta(TEST_IMAGE, 
logDeltaManifestBuilder().numBytes(50).build());
             TestUtils.waitForCondition(() -> emitter.images().size() == 1, 
"images.size == 1");
             // bytesSinceLastSnapshot was reset to 0 by the previous snapshot,
             // so this does not trigger a new snapshot.
-            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, 
logDeltaManifestBuilder().numBytes(150).build());
+            generator.publishLogDelta(TEST_IMAGE, 
logDeltaManifestBuilder().numBytes(150).build());
         }
         assertEquals(List.of(TEST_IMAGE), emitter.images());
         faultHandler.maybeRethrowFirstException();
@@ -227,7 +227,7 @@ public class SnapshotGeneratorTest {
                 setMaxBytesSinceLastSnapshot(200).
                 build()) {
             for (int i = 0; i < 2; i++) {
-                generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
+                generator.publishLogDelta(TEST_IMAGE,
                     
logDeltaManifestBuilder().elapsedNs(10000).numBytes(50000).build());
             }
         }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java 
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index 21bd04e2643..c022ae79893 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -22,12 +22,8 @@ import 
org.apache.kafka.common.metadata.RegisterControllerRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Message;
-import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
-import org.apache.kafka.raft.Batch;
-import org.apache.kafka.raft.BatchReader;
-import org.apache.kafka.raft.internals.MemoryBatchReader;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.util.MockRandom;
@@ -35,7 +31,6 @@ import org.apache.kafka.server.util.MockRandom;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -47,7 +42,6 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 /**
@@ -86,13 +80,6 @@ public class RecordTestUtils {
         }
     }
 
-    public static void replayOne(
-        Object target,
-        ApiMessageAndVersion recordAndVersion
-    ) {
-        replayAll(target, List.of(recordAndVersion));
-    }
-
     public static <T extends ApiMessage> Optional<T> recordAtIndexAs(
             Class<T> recordClazz,
             List<ApiMessageAndVersion> recordsAndVersions,
@@ -217,19 +204,6 @@ public class RecordTestUtils {
         }
     }
 
-    /**
-     * Replay a list of record batches.
-     *
-     * @param target        The object to invoke the replay function on.
-     * @param batches       A list of batches of records.
-     */
-    public static void replayAllBatches(Object target,
-                                        List<List<ApiMessageAndVersion>> 
batches) {
-        for (List<ApiMessageAndVersion> batch : batches) {
-            replayAll(target, batch);
-        }
-    }
-
     /**
      * Materialize the output of an iterator into a set.
      *
@@ -245,27 +219,6 @@ public class RecordTestUtils {
         return set;
     }
 
-    /**
-     * Assert that a batch iterator yields a given set of record batches.
-     *
-     * @param batches       A list of record batches.
-     * @param iterator      The input iterator.
-     */
-    public static void 
assertBatchIteratorContains(List<List<ApiMessageAndVersion>> batches,
-                                                   
Iterator<List<ApiMessageAndVersion>> iterator) throws Exception {
-        List<List<ApiMessageAndVersion>> actual = new ArrayList<>();
-        while (iterator.hasNext()) {
-            actual.add(new ArrayList<>(iterator.next()));
-        }
-        deepSortRecords(actual);
-        List<List<ApiMessageAndVersion>> expected = new ArrayList<>();
-        for (List<ApiMessageAndVersion> batch : batches) {
-            expected.add(new ArrayList<>(batch));
-        }
-        deepSortRecords(expected);
-        assertEquals(expected, actual);
-    }
-
     /**
      * Sort the contents of an object which contains records.
      *
@@ -297,48 +250,6 @@ public class RecordTestUtils {
         }
     }
 
-    /**
-     * Create a batch reader for testing.
-     *
-     * @param lastOffset the last offset of the given list of records
-     * @param appendTimestamp the append timestamp for the batches created
-     * @param records the records
-     * @return a batch reader which will return the given records
-     */
-    public static BatchReader<ApiMessageAndVersion> mockBatchReader(
-        long lastOffset,
-        long appendTimestamp,
-        List<ApiMessageAndVersion> records
-    ) {
-        List<Batch<ApiMessageAndVersion>> batches = new ArrayList<>();
-        long offset = lastOffset - records.size() + 1;
-        Iterator<ApiMessageAndVersion> iterator = records.iterator();
-        List<ApiMessageAndVersion> curRecords = new ArrayList<>();
-        assertTrue(iterator.hasNext()); // At least one record is required
-        while (true) {
-            if (!iterator.hasNext() || curRecords.size() >= 2) {
-                batches.add(Batch.data(offset, 0, appendTimestamp, 
sizeInBytes(curRecords), curRecords));
-                if (!iterator.hasNext()) {
-                    break;
-                }
-                offset += curRecords.size();
-                curRecords = new ArrayList<>();
-            }
-            curRecords.add(iterator.next());
-        }
-        return MemoryBatchReader.of(batches, __ -> { });
-    }
-
-
-    private static int sizeInBytes(List<ApiMessageAndVersion> records) {
-        int size = 0;
-        for (ApiMessageAndVersion record : records) {
-            ObjectSerializationCache cache = new ObjectSerializationCache();
-            size += MetadataRecordSerde.INSTANCE.recordSize(record, cache);
-        }
-        return size;
-    }
-
     public static ApiMessageAndVersion testRecord(int index) {
         MockRandom random = new MockRandom(index);
         return new ApiMessageAndVersion(
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index 834aa65cb69..869fdd159bc 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -56,7 +56,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletionStage;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.kafka.common.acl.AclOperation.ALL;
 import static org.apache.kafka.common.acl.AclOperation.ALTER;
@@ -252,8 +251,6 @@ public class StandardAuthorizerTest {
         return authorizer;
     }
 
-    private static final AtomicLong NEXT_ID = new AtomicLong(0);
-
     static StandardAcl newFooAcl(AclOperation op, AclPermissionType 
permission) {
         return new StandardAcl(
             TOPIC,
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 4ebec345b65..8e403b9c9e1 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -64,7 +64,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.IntStream;
 
 /**
  * The LocalLogManager is a test implementation that relies on the contents of 
memory.
@@ -323,35 +322,6 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             }
         }
 
-        /**
-         * Returns the snapshot whose last offset is the committed offset.
-         *
-         * If such snapshot doesn't exist, it waits until it does.
-         */
-        synchronized RawSnapshotReader waitForSnapshot(long committedOffset) 
throws InterruptedException {
-            while (true) {
-                RawSnapshotReader reader = snapshots.get(committedOffset);
-                if (reader != null) {
-                    return reader;
-                } else {
-                    this.wait();
-                }
-            }
-        }
-
-        /**
-         * Returns the latest snapshot.
-         *
-         * If a snapshot doesn't exists, it waits until it does.
-         */
-        synchronized RawSnapshotReader waitForLatestSnapshot() throws 
InterruptedException {
-            while (snapshots.isEmpty()) {
-                this.wait();
-            }
-
-            return Objects.requireNonNull(snapshots.lastEntry()).getValue();
-        }
-
         /**
          * Returns the snapshot id of the latest snapshot if there is one.
          *
@@ -361,27 +331,6 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             return Optional.ofNullable(snapshots.lastEntry()).map(entry -> 
entry.getValue().snapshotId());
         }
 
-        synchronized long appendedBytes() {
-            ObjectSerializationCache objectCache = new 
ObjectSerializationCache();
-
-            return batches
-                .values()
-                .stream()
-                .flatMapToInt(batch -> {
-                    if (batch instanceof LocalRecordBatch localBatch) {
-                        return localBatch.records.stream().mapToInt(record -> 
messageSize(record, objectCache));
-                    } else {
-                        return IntStream.empty();
-                    }
-                })
-                .sum();
-        }
-
-        public SharedLogData setInitialMaxReadOffset(long 
initialMaxReadOffset) {
-            this.initialMaxReadOffset = initialMaxReadOffset;
-            return this;
-        }
-
         public long initialMaxReadOffset() {
             return initialMaxReadOffset;
         }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index 661c96a3a8b..a1e6742f8ef 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -175,14 +175,6 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
         return clusterId;
     }
 
-    AtomicReference<String> firstError() {
-        return firstError;
-    }
-
-    File dir() {
-        return dir;
-    }
-
     LeaderAndEpoch waitForLeader() throws InterruptedException {
         AtomicReference<LeaderAndEpoch> value = new AtomicReference<>(null);
         TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
@@ -219,18 +211,6 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
         }
     }
 
-    public RawSnapshotReader waitForSnapshot(long committedOffset) throws 
InterruptedException {
-        return shared.waitForSnapshot(committedOffset);
-    }
-
-    public RawSnapshotReader waitForLatestSnapshot() throws 
InterruptedException {
-        return shared.waitForLatestSnapshot();
-    }
-
-    public long appendedBytes() {
-        return shared.appendedBytes();
-    }
-
     public LeaderAndEpoch leaderAndEpoch() {
         return shared.leaderAndEpoch();
     }


Reply via email to