This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 025e791d0cdfc025fd064379f8ebb8b9fe380a26
Author: Kamal Chandraprakash <kchandraprak...@uber.com>
AuthorDate: Thu Jun 6 21:26:08 2024 +0530

    MINOR: Cleanup the storage module unit tests (#16202)
    
    - Use SystemTime instead of MockTime when time is not mocked
    - Use static assertions to reduce the line length
    - Fold the lines if it exceeds the limit
    - rename tp0 to tpId0 when it refers to TopicIdPartition
    
    Reviewers: Kuan-Po (Cooper) Tseng <brandb...@gmail.com>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../storage/RemoteLogMetadataFormatterTest.java    | 18 ++---
 .../storage/RemoteLogMetadataSerdeTest.java        | 38 +++++-----
 .../storage/RemoteLogMetadataTransformTest.java    | 40 +++++------
 ...picBasedRemoteLogMetadataManagerConfigTest.java | 53 +++++---------
 ...ogMetadataManagerMultipleSubscriptionsTest.java | 41 +++++------
 ...icBasedRemoteLogMetadataManagerRestartTest.java | 25 +++----
 .../TopicBasedRemoteLogMetadataManagerTest.java    | 56 ++++++++-------
 .../storage/RemoteLogMetadataManagerTest.java      | 83 ++++++++++++----------
 8 files changed, 169 insertions(+), 185 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index e3d1a2aee0c..1380a735fba 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -24,7 +24,6 @@ import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteL
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -35,6 +34,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED;
 
 public class RemoteLogMetadataFormatterTest {
     private static final Uuid TOPIC_ID = Uuid.randomUuid();
@@ -51,12 +51,12 @@ public class RemoteLogMetadataFormatterTest {
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
SEGMENT_ID);
         Optional<CustomMetadata> customMetadata = Optional.of(new 
CustomMetadata(new byte[10]));
         RemoteLogSegmentMetadata remoteLogMetadata = new 
RemoteLogSegmentMetadata(
-                remoteLogSegmentId, 0L, 100L, -1L, 1,
-                123L, 1024, customMetadata,
-                RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs);
+                remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, 
customMetadata, COPY_SEGMENT_STARTED,
+                segLeaderEpochs);
 
         byte[] metadataBytes = new 
RemoteLogMetadataSerde().serialize(remoteLogMetadata);
-        ConsumerRecord<byte[], byte[]> metadataRecord = new 
ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes);
+        ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>(
+                "__remote_log_metadata", 0, 0, null, metadataBytes);
 
         String expected = String.format(
                 "partition: 0, offset: 0, value: " +
@@ -68,9 +68,11 @@ public class RemoteLogMetadataFormatterTest {
                 TOPIC_ID, SEGMENT_ID);
         try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
              PrintStream ps = new PrintStream(baos)) {
-            RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = new 
RemoteLogMetadataSerde.RemoteLogMetadataFormatter();
-            formatter.writeTo(metadataRecord, ps);
-            assertEquals(expected, baos.toString());
+            try (RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter =
+                         new 
RemoteLogMetadataSerde.RemoteLogMetadataFormatter()) {
+                formatter.writeTo(metadataRecord, ps);
+                assertEquals(expected, baos.toString());
+            }
         }
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
index 5b48790c7fd..b1b91dacf23 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
@@ -19,48 +19,48 @@ package org.apache.kafka.server.log.remote.metadata.storage;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
-import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
+import static 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_MARKED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
 public class RemoteLogMetadataSerdeTest {
 
     public static final String TOPIC = "foo";
     private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0));
-    private final Time time = new MockTime(1);
+    private final Time time = new SystemTime();
 
     @Test
     public void testRemoteLogSegmentMetadataSerde() {
         RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
createRemoteLogSegmentMetadata();
-
         doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata);
     }
 
     @Test
     public void testRemoteLogSegmentMetadataUpdateSerde() {
         RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = 
createRemoteLogSegmentMetadataUpdate();
-
         doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate);
     }
 
     @Test
     public void testRemotePartitionDeleteMetadataSerde() {
         RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = 
createRemotePartitionDeleteMetadata();
-
         doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata);
     }
 
@@ -70,24 +70,19 @@ public class RemoteLogMetadataSerdeTest {
         segLeaderEpochs.put(1, 20L);
         segLeaderEpochs.put(2, 80L);
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
-        return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 
1,
-                                            time.milliseconds(), 1024,
-                                            Optional.of(new CustomMetadata(new 
byte[] {0, 1, 2, 3})),
-                                            
RemoteLogSegmentState.COPY_SEGMENT_STARTED,
-                                            segLeaderEpochs
-        );
+        return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 
1, 
+                time.milliseconds(), 1024, Optional.of(new CustomMetadata(new 
byte[] {0, 1, 2, 3})),
+                COPY_SEGMENT_STARTED, segLeaderEpochs);
     }
 
     private RemoteLogSegmentMetadataUpdate 
createRemoteLogSegmentMetadataUpdate() {
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
         return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, 
time.milliseconds(),
-                                                  Optional.of(new 
CustomMetadata(new byte[] {0, 1, 2, 3})),
-                                                  
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
+                Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), 
COPY_SEGMENT_FINISHED, 2);
     }
 
     private RemotePartitionDeleteMetadata 
createRemotePartitionDeleteMetadata() {
-        return new RemotePartitionDeleteMetadata(TP0, 
RemotePartitionDeleteState.DELETE_PARTITION_MARKED,
-                                                 time.milliseconds(), 0);
+        return new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_MARKED, 
time.milliseconds(), 0);
     }
 
     private void doTestRemoteLogMetadataSerde(RemoteLogMetadata 
remoteLogMetadata) {
@@ -96,16 +91,17 @@ public class RemoteLogMetadataSerdeTest {
         byte[] metadataBytes = serializer.serialize(remoteLogMetadata);
 
         // Deserialize the bytes and check the RemoteLogMetadata object is as 
expected.
-        // Created another RemoteLogMetadataSerde instance to depict the real 
usecase of serializer and deserializer having their own instances.
+        // Created another RemoteLogMetadataSerde instance to depict the real 
usecase of serializer and 
+        // deserializer having their own instances.
         RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde();
         RemoteLogMetadata deserializedRemoteLogMetadata = 
deserializer.deserialize(metadataBytes);
-        Assertions.assertEquals(remoteLogMetadata, 
deserializedRemoteLogMetadata);
+        assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata);
     }
 
     @Test
     public void testInvalidRemoteStorageMetadata() {
         // Serializing receives an exception as it does not have the expected 
RemoteLogMetadata registered in serdes.
-        Assertions.assertThrows(IllegalArgumentException.class,
+        assertThrows(IllegalArgumentException.class,
             () -> new RemoteLogMetadataSerde().serialize(new 
InvalidRemoteLogMetadata(1, time.milliseconds())));
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
index 504f47e17a5..70770542eb9 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.server.log.remote.metadata.storage;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform;
@@ -29,60 +29,56 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
-import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.Optional;
 
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
+import static 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_STARTED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 public class RemoteLogMetadataTransformTest {
     private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
-    private final Time time = new MockTime(1);
+    private final Time time = new SystemTime();
 
     @Test
     public void testRemoteLogSegmentMetadataTransform() {
         RemoteLogSegmentMetadataTransform metadataTransform = new 
RemoteLogSegmentMetadataTransform();
-
         RemoteLogSegmentMetadata metadata = createRemoteLogSegmentMetadata();
         ApiMessageAndVersion apiMessageAndVersion = 
metadataTransform.toApiMessageAndVersion(metadata);
         RemoteLogSegmentMetadata remoteLogSegmentMetadataFromRecord = 
metadataTransform
                 .fromApiMessageAndVersion(apiMessageAndVersion);
-
-        Assertions.assertEquals(metadata, remoteLogSegmentMetadataFromRecord);
+        assertEquals(metadata, remoteLogSegmentMetadataFromRecord);
     }
 
     @Test
     public void testRemoteLogSegmentMetadataUpdateTransform() {
         RemoteLogSegmentMetadataUpdateTransform metadataUpdateTransform = new 
RemoteLogSegmentMetadataUpdateTransform();
-
-        RemoteLogSegmentMetadataUpdate metadataUpdate =
-                new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, 
Uuid.randomUuid()), time.milliseconds(),
-                                                   Optional.of(new 
CustomMetadata(new byte[]{0, 1, 2, 3})),
-                                                   
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
+        RemoteLogSegmentMetadataUpdate metadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
+                new RemoteLogSegmentId(TP0, Uuid.randomUuid()), 
time.milliseconds(),
+                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), 
COPY_SEGMENT_FINISHED, 1);
         ApiMessageAndVersion apiMessageAndVersion = 
metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate);
-        RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = 
metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
-
-        Assertions.assertEquals(metadataUpdate, metadataUpdateFromRecord);
+        RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord =
+                
metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
+        assertEquals(metadataUpdate, metadataUpdateFromRecord);
     }
 
     private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
         return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 
1,
-                                            time.milliseconds(), 1024, 
Collections.singletonMap(0, 0L));
+                time.milliseconds(), 1024, Collections.singletonMap(0, 0L));
     }
 
     @Test
     public void testRemoteLogPartitionMetadataTransform() {
         RemotePartitionDeleteMetadataTransform transform = new 
RemotePartitionDeleteMetadataTransform();
-
         RemotePartitionDeleteMetadata partitionDeleteMetadata
-                = new RemotePartitionDeleteMetadata(TP0, 
RemotePartitionDeleteState.DELETE_PARTITION_STARTED, time.milliseconds(), 1);
+                = new RemotePartitionDeleteMetadata(TP0, 
DELETE_PARTITION_STARTED, time.milliseconds(), 1);
         ApiMessageAndVersion apiMessageAndVersion = 
transform.toApiMessageAndVersion(partitionDeleteMetadata);
-        RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord = 
transform.fromApiMessageAndVersion(apiMessageAndVersion);
-
-        Assertions.assertEquals(partitionDeleteMetadata, 
partitionDeleteMetadataFromRecord);
+        RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord =
+                transform.fromApiMessageAndVersion(apiMessageAndVersion);
+        assertEquals(partitionDeleteMetadata, 
partitionDeleteMetadataFromRecord);
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index 8e3985d0d5f..34f1fb08366 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -20,10 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.AbstractMap;
 import java.util.HashMap;
@@ -38,10 +35,10 @@ import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
 
-public class TopicBasedRemoteLogMetadataManagerConfigTest {
-    private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class);
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-    private static final String BOOTSTRAP_SERVERS = "localhost:9091";
+public class TopicBasedRemoteLogMetadataManagerConfigTest {
+    private static final String BOOTSTRAP_SERVERS = "localhost:2222";
 
     @Test
     public void testValidConfig() {
@@ -60,41 +57,32 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
 
         // Check for topic properties
         TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(props);
-        
Assertions.assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), 
rlmmConfig.metadataTopicPartitionsCount());
+        assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), 
rlmmConfig.metadataTopicPartitionsCount());
 
         // Check for common client configs.
-        Assertions.assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
-        Assertions.assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
-        Assertions.assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
 
         for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
-            log.info("Checking config: " + entry.getKey());
-            Assertions.assertEquals(entry.getValue(),
-                                    
rlmmConfig.commonProperties().get(entry.getKey()));
-            Assertions.assertEquals(entry.getValue(),
-                                    
rlmmConfig.producerProperties().get(entry.getKey()));
-            Assertions.assertEquals(entry.getValue(),
-                                    
rlmmConfig.consumerProperties().get(entry.getKey()));
+            assertEquals(entry.getValue(), 
rlmmConfig.commonProperties().get(entry.getKey()));
+            assertEquals(entry.getValue(), 
rlmmConfig.producerProperties().get(entry.getKey()));
+            assertEquals(entry.getValue(), 
rlmmConfig.consumerProperties().get(entry.getKey()));
         }
-
         // Check for producer configs.
         for (Map.Entry<String, Object> entry : producerConfig.entrySet()) {
-            log.info("Checking config: " + entry.getKey());
-            Assertions.assertEquals(entry.getValue(),
-                                    
rlmmConfig.producerProperties().get(entry.getKey()));
+            assertEquals(entry.getValue(), 
rlmmConfig.producerProperties().get(entry.getKey()));
         }
-
         // Check for consumer configs.
         for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) {
-            log.info("Checking config: " + entry.getKey());
-            Assertions.assertEquals(entry.getValue(),
-                                    
rlmmConfig.consumerProperties().get(entry.getKey()));
+            assertEquals(entry.getValue(), 
rlmmConfig.consumerProperties().get(entry.getKey()));
         }
     }
 
     @Test
     public void testCommonProducerConsumerOverridesConfig() {
-        Map.Entry<String, Long> overrideEntry = new 
AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 
60000L);
+        Map.Entry<String, Long> overrideEntry =
+                new 
AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 
60000L);
         Map<String, Object> commonClientConfig = new HashMap<>();
         commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10);
         commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 
1000L);
@@ -114,12 +102,9 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         Map<String, Object> props = createValidConfigProps(commonClientConfig, 
producerConfig, consumerConfig);
         TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(props);
 
-        Assertions.assertEquals(overrideCommonPropValue,
-                                
rlmmConfig.commonProperties().get(overrideEntry.getKey()));
-        Assertions.assertEquals(overriddenProducerPropValue,
-                                
rlmmConfig.producerProperties().get(overrideEntry.getKey()));
-        Assertions.assertEquals(overriddenConsumerPropValue,
-                                
rlmmConfig.consumerProperties().get(overrideEntry.getKey()));
+        assertEquals(overrideCommonPropValue, 
rlmmConfig.commonProperties().get(overrideEntry.getKey()));
+        assertEquals(overriddenProducerPropValue, 
rlmmConfig.producerProperties().get(overrideEntry.getKey()));
+        assertEquals(overriddenConsumerPropValue, 
rlmmConfig.consumerProperties().get(overrideEntry.getKey()));
     }
 
     private Map<String, Object> createValidConfigProps(Map<String, Object> 
commonClientConfig,
@@ -129,7 +114,6 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
         props.put(BROKER_ID, 1);
         props.put(LOG_DIR, TestUtils.tempDirectory().getAbsolutePath());
-
         props.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, (short) 
3);
         props.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 10);
         props.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, 60 * 60 * 
1000L);
@@ -138,17 +122,14 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest 
{
         for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
             props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
entry.getKey(), entry.getValue());
         }
-
         // producer configs
         for (Map.Entry<String, Object> entry : producerConfig.entrySet()) {
             props.put(REMOTE_LOG_METADATA_PRODUCER_PREFIX + entry.getKey(), 
entry.getValue());
         }
-
         //consumer configs
         for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) {
             props.put(REMOTE_LOG_METADATA_CONSUMER_PREFIX + entry.getKey(), 
entry.getValue());
         }
-
         return props;
     }
 }
\ No newline at end of file
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
index 916b475c143..d02da098040 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
@@ -27,12 +27,11 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -44,6 +43,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
@@ -55,10 +58,7 @@ import static org.mockito.Mockito.verify;
 @ClusterTestDefaults(brokers = 3)
 public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
     private final ClusterInstance clusterInstance;
-
-    private static final int SEG_SIZE = 1024 * 1024;
-
-    private final Time time = new MockTime(1);
+    private final Time time = new SystemTime();
 
     
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance 
clusterInstance) {
         this.clusterInstance = clusterInstance;
@@ -125,24 +125,25 @@ public class 
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
             // Add segments for these partitions but an exception is received 
as they have not yet been subscribed.
             // These messages would have been published to the respective 
metadata topic partitions but the ConsumerManager
             // has not yet been subscribing as they are not yet registered.
+            int segSize = 1048576;
             RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
                 0, 100, -1L, 0,
-                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 
0L));
-            ExecutionException exception = 
Assertions.assertThrows(ExecutionException.class, () -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
-            Assertions.assertEquals("org.apache.kafka.common.KafkaException: 
This consumer is not assigned to the target partition 0. Currently assigned 
partitions: []",
+                time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
+            ExecutionException exception = 
assertThrows(ExecutionException.class,
+                    () -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
+            assertEquals("org.apache.kafka.common.KafkaException: This 
consumer is not assigned to the target partition 0. Currently assigned 
partitions: []",
                 exception.getMessage());
 
             RemoteLogSegmentMetadata followerSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, 
Uuid.randomUuid()),
                 0, 100, -1L, 0,
-                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 
0L));
-            exception = Assertions.assertThrows(ExecutionException.class, () 
-> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
-            Assertions.assertEquals("org.apache.kafka.common.KafkaException: 
This consumer is not assigned to the target partition 0. Currently assigned 
partitions: []",
+                time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
+            exception = assertThrows(ExecutionException.class, () -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
+            assertEquals("org.apache.kafka.common.KafkaException: This 
consumer is not assigned to the target partition 0. Currently assigned 
partitions: []",
                 exception.getMessage());
 
             // `listRemoteLogSegments` will receive an exception as these 
topic partitions are not yet registered.
-            Assertions.assertThrows(RemoteStorageException.class, () -> 
remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition));
-            Assertions.assertThrows(RemoteStorageException.class, () -> 
remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));
-
+            assertThrows(RemoteStorageException.class, () -> 
remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition));
+            assertThrows(RemoteStorageException.class, () -> 
remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));
 
             
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
                     Collections.emptySet());
@@ -156,8 +157,8 @@ public class 
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
 
             // leader partitions would have received as it is registered, but 
follower partition is not yet registered,
             // hence it throws an exception.
-            
Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext());
-            Assertions.assertThrows(RemoteStorageException.class, () -> 
remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));
+            
assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext());
+            assertThrows(RemoteStorageException.class, () -> 
remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));
 
             // Register follower partition
             // Phaser::bulkRegister and Phaser::register provide the "countUp" 
feature
@@ -172,15 +173,15 @@ public class 
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
             
verify(spyRemotePartitionMetadataStore).markInitialized(followerTopicIdPartition);
             
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
             // In this state, all the metadata should be available in RLMM for 
both leader and follower partitions.
-            
Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext(),
 "No segments found");
-            
Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition).hasNext(),
 "No segments found");
+            
assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext(),
 "No segments found");
+            
assertTrue(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition).hasNext(),
 "No segments found");
         }
     }
 
     private void createTopic(String topic, Map<Integer, List<Integer>> 
replicasAssignments) {
         try (Admin admin = 
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
 clusterInstance.bootstrapServers()))) {
             admin.createTopics(Collections.singletonList(new NewTopic(topic, 
replicasAssignments)));
-            Assertions.assertDoesNotThrow(() -> 
clusterInstance.waitForTopic(topic, replicasAssignments.size()));
+            assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, 
replicasAssignments.size()));
         }
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index dee686dc036..a08f12d9ae2 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -24,12 +24,11 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -37,14 +36,13 @@ import java.util.Arrays;
 import java.util.Collections;
 
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
 @Tag("integration")
 public class TopicBasedRemoteLogMetadataManagerRestartTest {
 
-    private static final int SEG_SIZE = 1024 * 1024;
-
-    private final Time time = new MockTime(1);
+    private final Time time = new SystemTime();
     private final String logDir = 
TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath();
     private final ClusterInstance clusterInstance;
 
@@ -76,16 +74,17 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
         clusterInstance.waitForTopic(leaderTopic, 1);
         clusterInstance.waitForTopic(followerTopic, 1);
 
-        final TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
-        final TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
+        TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
+        TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
+        int segSize = 1048576;
         RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(
                 new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
                 0, 100, -1L, 0,
-                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 
0L));
+                time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
         RemoteLogSegmentMetadata followerSegmentMetadata = new 
RemoteLogSegmentMetadata(
                 new RemoteLogSegmentId(followerTopicIdPartition, 
Uuid.randomUuid()),
                 0, 100, -1L, 0,
-                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 
0L));
+                time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
 
         try (TopicBasedRemoteLogMetadataManager 
topicBasedRemoteLogMetadataManager = 
createTopicBasedRemoteLogMetadataManager()) {
             // Register these partitions to RemoteLogMetadataManager.
@@ -115,12 +114,14 @@ public class 
TopicBasedRemoteLogMetadataManagerRestartTest {
             RemoteLogSegmentMetadata leaderSegmentMetadata2 = new 
RemoteLogSegmentMetadata(
                     new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
                     101, 200, -1L, 0,
-                    time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 
101L));
+                    time.milliseconds(), segSize, Collections.singletonMap(0, 
101L));
             
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get();
 
             // Check that both the stored segment and recently added segment 
are available.
-            
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Arrays.asList(leaderSegmentMetadata,
 leaderSegmentMetadata2).iterator(),
-                    
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)));
+            assertTrue(TestUtils.sameElementsWithoutOrder(
+                    Arrays.asList(leaderSegmentMetadata, 
leaderSegmentMetadata2).iterator(),
+                    
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+            );
         }
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 183a8493412..8b9cfd0700c 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -25,14 +25,13 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.IOException;
@@ -42,6 +41,10 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
@@ -50,11 +53,10 @@ import static org.mockito.Mockito.verify;
 @ExtendWith(ClusterTestExtensions.class)
 @ClusterTestDefaults(brokers = 3)
 public class TopicBasedRemoteLogMetadataManagerTest {
-    private static final int SEG_SIZE = 1024 * 1024;
-
+    private static final int SEG_SIZE = 1048576;
     private final ClusterInstance clusterInstance;
     private final RemotePartitionMetadataStore 
spyRemotePartitionMetadataEventHandler = spy(new 
RemotePartitionMetadataStore());
-    private final Time time = new MockTime(1);
+    private final Time time = new SystemTime();
     private TopicBasedRemoteLogMetadataManager remoteLogMetadataManager;
 
     TopicBasedRemoteLogMetadataManagerTest(ClusterInstance clusterInstance) {
@@ -83,7 +85,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
             admin.createTopics(Collections.singletonList(new NewTopic(topic, 
1, (short) 1))).all().get();
             clusterInstance.waitForTopic(topic, 1);
             boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, 
topic);
-            Assertions.assertTrue(doesTopicExist);
+            assertTrue(doesTopicExist);
         }
     }
 
@@ -92,7 +94,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         try (Admin admin = clusterInstance.createAdminClient()) {
             String topic = "dummy-test-topic";
             boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, 
topic);
-            Assertions.assertFalse(doesTopicExist);
+            assertFalse(doesTopicExist);
         }
     }
 
@@ -139,37 +141,37 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, 
Uuid.randomUuid()),
                                                                                
 0, 100, -1L, 0,
                                                                                
 time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
-        Assertions.assertThrows(Exception.class, () -> 
topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
+        assertThrows(Exception.class, () -> 
topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
 
         RemoteLogSegmentMetadata followerSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, 
Uuid.randomUuid()),
                                                                                
 0, 100, -1L, 0,
                                                                                
 time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
-        Assertions.assertThrows(Exception.class, () -> 
topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
+        assertThrows(Exception.class, () -> 
topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
 
         // `listRemoteLogSegments` will receive an exception as these topic 
partitions are not yet registered.
-        Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> 
topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition));
-        Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> 
topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition));
+        assertThrows(RemoteResourceNotFoundException.class, () -> 
topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition));
+        assertThrows(RemoteResourceNotFoundException.class, () -> 
topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition));
 
         
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition),
                                                       
Collections.singleton(newFollowerTopicIdPartition));
 
         // RemoteLogSegmentMetadata events are already published, and 
topicBasedRlmm's consumer manager will start
         // fetching those events and build the cache.
-        Assertions.assertTrue(initializationLatch.await(30_000, 
TimeUnit.MILLISECONDS));
-        
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, 
TimeUnit.MILLISECONDS));
+        assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
+        assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, 
TimeUnit.MILLISECONDS));
 
         
verify(spyRemotePartitionMetadataEventHandler).markInitialized(newLeaderTopicIdPartition);
         
verify(spyRemotePartitionMetadataEventHandler).markInitialized(newFollowerTopicIdPartition);
         
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(leaderSegmentMetadata);
         
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
-        
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
-        
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
+        
assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
+        
assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
     }
 
     @ClusterTest
     public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() 
{
         TopicIdPartition topicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
-        Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> 
topicBasedRlmm().remoteLogSize(topicIdPartition, 0));
+        assertThrows(RemoteResourceNotFoundException.class, () -> 
topicBasedRlmm().remoteLogSize(topicIdPartition, 0));
     }
 
     @ClusterTest
@@ -206,8 +208,8 @@ public class TopicBasedRemoteLogMetadataManagerTest {
 
         // RemoteLogSegmentMetadata events are already published, and 
topicBasedRlmm's consumer manager will start
         // fetching those events and build the cache.
-        Assertions.assertTrue(initializationLatch.await(30_000, 
TimeUnit.MILLISECONDS));
-        
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, 
TimeUnit.MILLISECONDS));
+        assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
+        assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, 
TimeUnit.MILLISECONDS));
 
         
verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition);
         
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata);
@@ -215,7 +217,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata);
         Long remoteLogSize = 
topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0);
 
-        Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize);
+        assertEquals(SEG_SIZE * 6, remoteLogSize);
     }
 
     @ClusterTest
@@ -251,16 +253,16 @@ public class TopicBasedRemoteLogMetadataManagerTest {
 
         // RemoteLogSegmentMetadata events are already published, and 
topicBasedRlmm's consumer manager will start
         // fetching those events and build the cache.
-        Assertions.assertTrue(initializationLatch.await(30_000, 
TimeUnit.MILLISECONDS));
-        
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, 
TimeUnit.MILLISECONDS));
+        assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
+        assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, 
TimeUnit.MILLISECONDS));
 
         
verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition);
         
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata);
         
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata);
         
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata);
-        Assertions.assertEquals(SEG_SIZE, 
topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0));
-        Assertions.assertEquals(SEG_SIZE * 2, 
topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1));
-        Assertions.assertEquals(SEG_SIZE * 3, 
topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2));
+        assertEquals(SEG_SIZE, 
topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0));
+        assertEquals(SEG_SIZE * 2, 
topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1));
+        assertEquals(SEG_SIZE * 3, 
topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2));
     }
 
     @ClusterTest
@@ -293,13 +295,13 @@ public class TopicBasedRemoteLogMetadataManagerTest {
 
         // RemoteLogSegmentMetadata events are already published, and 
topicBasedRlmm's consumer manager will start
         // fetching those events and build the cache.
-        Assertions.assertTrue(initializationLatch.await(30_000, 
TimeUnit.MILLISECONDS));
-        
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, 
TimeUnit.MILLISECONDS));
+        assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
+        assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, 
TimeUnit.MILLISECONDS));
 
         
verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition);
         
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata);
         
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata);
-        Assertions.assertEquals(0, 
topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001));
+        assertEquals(0, 
topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001));
     }
 
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index b67c316e5e4..d970dd8b68c 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -23,33 +23,39 @@ import kafka.test.junit.ClusterTestExtensions;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataManagerTestUtils;
 import 
org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore;
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-//import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
+import static 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_FINISHED;
+import static 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_MARKED;
+import static 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_STARTED;
+
 @Tag("integration")
 @ExtendWith(ClusterTestExtensions.class)
 @ClusterTestDefaults(brokers = 3)
 public class RemoteLogMetadataManagerTest {
     private final ClusterInstance clusterInstance;
-
     private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
-    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int SEG_SIZE = 1048576;
     private static final int BROKER_ID_0 = 0;
     private static final int BROKER_ID_1 = 1;
-
-    private final Time time = new MockTime(1);
+    private final Time time = new SystemTime();
 
     RemoteLogMetadataManagerTest(ClusterInstance clusterInstance) {
         this.clusterInstance = clusterInstance;
@@ -71,26 +77,25 @@ public class RemoteLogMetadataManagerTest {
             // 1.Create a segment with state COPY_SEGMENT_STARTED, and this 
segment should not be available.
             Map<Integer, Long> segmentLeaderEpochs = 
Collections.singletonMap(0, 101L);
             RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
-            RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0,
-                                                                               
     time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
+            RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(
+                    segmentId, 101L, 200L, -1L, BROKER_ID_0, 
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
             // Wait until the segment is added successfully.
-            Assertions.assertDoesNotThrow(() -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());
+            assertDoesNotThrow(() -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());
 
             // Search should not return the above segment.
-            
Assertions.assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 
0, 150).isPresent());
+            assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 
0, 150).isPresent());
 
             // 2.Move that segment to COPY_SEGMENT_FINISHED state and this 
segment should be available.
-            RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
-                    Optional.empty(),
-                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
-                    BROKER_ID_1);
+            RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
+                    segmentId, time.milliseconds(), Optional.empty(), 
COPY_SEGMENT_FINISHED, BROKER_ID_1);
             // Wait until the segment is updated successfully.
-            Assertions.assertDoesNotThrow(() -> 
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get());
+            assertDoesNotThrow(() -> 
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get());
             RemoteLogSegmentMetadata expectedSegmentMetadata = 
segmentMetadata.createWithUpdates(segmentMetadataUpdate);
 
             // Search should return the above segment.
-            Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset150 = 
remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150);
-            Assertions.assertEquals(Optional.of(expectedSegmentMetadata), 
segmentMetadataForOffset150);
+            Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset150 =
+                    remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 
150);
+            assertEquals(Optional.of(expectedSegmentMetadata), 
segmentMetadataForOffset150);
         }
     }
 
@@ -111,46 +116,46 @@ public class RemoteLogMetadataManagerTest {
             segmentLeaderEpochs.put(2, 50L);
             segmentLeaderEpochs.put(3, 80L);
             RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
-            RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 0L, 100L,
-                                                                               
     -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE,
-                                                                               
     segmentLeaderEpochs);
+            RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(
+                    segmentId, 0L, 100L, -1L, BROKER_ID_0, 
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
             // Wait until the segment is added successfully.
-            Assertions.assertDoesNotThrow(() -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());
+            assertDoesNotThrow(() -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());
 
             RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
-                    segmentId, time.milliseconds(), Optional.empty(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+                    segmentId, time.milliseconds(), Optional.empty(), 
COPY_SEGMENT_FINISHED, BROKER_ID_1);
             // Wait until the segment is updated successfully.
-            Assertions.assertDoesNotThrow(() -> 
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get());
+            assertDoesNotThrow(() -> 
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get());
 
             RemoteLogSegmentMetadata expectedSegMetadata = 
segmentMetadata.createWithUpdates(segmentMetadataUpdate);
 
             // Check that the segment exists in RLMM.
-            Optional<RemoteLogSegmentMetadata> segMetadataForOffset30Epoch1 = 
remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L);
-            Assertions.assertEquals(Optional.of(expectedSegMetadata), 
segMetadataForOffset30Epoch1);
+            Optional<RemoteLogSegmentMetadata> segMetadataForOffset30Epoch1 =
+                    remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 
30L);
+            assertEquals(Optional.of(expectedSegMetadata), 
segMetadataForOffset30Epoch1);
 
             // Mark the partition for deletion and wait for it to be updated 
successfully.
-            Assertions.assertDoesNotThrow(() -> 
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
-                    
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_MARKED)).get());
+            assertDoesNotThrow(() -> 
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
+                    
createRemotePartitionDeleteMetadata(DELETE_PARTITION_MARKED)).get());
 
-            Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelMark = 
remoteLogMetadataManager.remoteLogSegmentMetadata(TP0,
-                                                                               
                                                1, 30L);
-            Assertions.assertEquals(Optional.of(expectedSegMetadata), 
segmentMetadataAfterDelMark);
+            Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelMark =
+                    remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 
30L);
+            assertEquals(Optional.of(expectedSegMetadata), 
segmentMetadataAfterDelMark);
 
             // Set the partition deletion state as started. Partition and 
segments should still be accessible as they are not
             // yet deleted. Wait until the segment state is updated 
successfully.
-            Assertions.assertDoesNotThrow(() -> 
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
-                    
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_STARTED)).get());
+            assertDoesNotThrow(() -> 
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
+                    
createRemotePartitionDeleteMetadata(DELETE_PARTITION_STARTED)).get());
 
-            Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelStart = 
remoteLogMetadataManager.remoteLogSegmentMetadata(TP0,
-                                                                               
                                                 1, 30L);
-            Assertions.assertEquals(Optional.of(expectedSegMetadata), 
segmentMetadataAfterDelStart);
+            Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelStart =
+                    remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 
30L);
+            assertEquals(Optional.of(expectedSegMetadata), 
segmentMetadataAfterDelStart);
 
             // Set the partition deletion state as finished. RLMM should clear 
all its internal state for that partition.
             // Wait until the segment state is updated successfully.
-            Assertions.assertDoesNotThrow(() -> 
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
-                    
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_FINISHED)).get());
+            assertDoesNotThrow(() -> 
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
+                    
createRemotePartitionDeleteMetadata(DELETE_PARTITION_FINISHED)).get());
 
-            Assertions.assertThrows(RemoteResourceNotFoundException.class,
+            assertThrows(RemoteResourceNotFoundException.class,
                 () -> remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 
1, 30L));
         }
     }

Reply via email to