This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d0a9a04a020 MINOR: Cleanups in storage module (#20270)
d0a9a04a020 is described below

commit d0a9a04a0202f2ce6354fd72488aab665d046ee7
Author: Lan Ding <[email protected]>
AuthorDate: Thu Jul 31 20:55:49 2025 +0800

    MINOR: Cleanups in storage module (#20270)
    
    Cleanups including:
    
    - Rewrite `FetchCountAndOp`  as a record class
    - Replace `Tuple` by `Map.Entry`
    
    Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../common/runtime/CoordinatorLoaderImplTest.java  | 112 ++++++++++-----------
 .../tiered/storage/TieredStorageTestBuilder.java   |   2 +-
 .../tiered/storage/actions/ConsumeAction.java      |   9 +-
 .../OffloadAndTxnConsumeFromLeaderTest.java        |   2 +-
 .../tiered/storage/specs/FetchCountAndOp.java      |  35 +++++++
 .../tiered/storage/specs/RemoteFetchCount.java     |  31 ------
 .../storage/utils/LocalTieredStorageOutput.java    |  12 +--
 7 files changed, 102 insertions(+), 101 deletions(-)

diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
index 0113fbd657d..8760e9347a1 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
@@ -43,6 +43,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -66,14 +67,11 @@ import static org.mockito.Mockito.when;
 @Timeout(60)
 class CoordinatorLoaderImplTest {
 
-    private record Tuple<K, V>(K key, V value) {
-    }
-
-    private static class StringKeyValueDeserializer implements 
Deserializer<Tuple<String, String>> {
+    private static class StringKeyValueDeserializer implements 
Deserializer<Map.Entry<String, String>> {
 
         @Override
-        public Tuple<String, String> deserialize(ByteBuffer key, ByteBuffer 
value) throws RuntimeException {
-            return new Tuple<>(
+        public Map.Entry<String, String> deserialize(ByteBuffer key, 
ByteBuffer value) throws RuntimeException {
+            return Map.entry(
                     StandardCharsets.UTF_8.decode(key).toString(),
                     StandardCharsets.UTF_8.decode(value).toString()
             );
@@ -85,10 +83,10 @@ class CoordinatorLoaderImplTest {
         TopicPartition tp = new TopicPartition("foo", 0);
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.empty();
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.empty();
-        Deserializer<Tuple<String, String>> serde = mock(Deserializer.class);
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        Deserializer<Map.Entry<String, String>> serde = 
mock(Deserializer.class);
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -104,10 +102,10 @@ class CoordinatorLoaderImplTest {
         TopicPartition tp = new TopicPartition("foo", 0);
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(mock(UnifiedLog.class));
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.empty();
-        Deserializer<Tuple<String, String>> serde = mock(Deserializer.class);
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        Deserializer<Map.Entry<String, String>> serde = 
mock(Deserializer.class);
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -125,10 +123,10 @@ class CoordinatorLoaderImplTest {
         UnifiedLog log = mock(UnifiedLog.class);
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(9L);
-        Deserializer<Tuple<String, String>> serde = new 
StringKeyValueDeserializer();
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        Deserializer<Map.Entry<String, String>> serde = new 
StringKeyValueDeserializer();
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -188,13 +186,13 @@ class CoordinatorLoaderImplTest {
             // Includes 7 normal + 2 control (COMMIT, ABORT)
             assertEquals(9, summary.numRecords());
 
-            verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
-            verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
-            verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
-            verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
-            verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
-            verify(coordinator).replay(5L, 100L, (short) 5, new Tuple<>("k6", 
"v6"));
-            verify(coordinator).replay(6L, 100L, (short) 5, new Tuple<>("k7", 
"v7"));
+            verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
+            verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
+            verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
+            verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
+            verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
+            verify(coordinator).replay(5L, 100L, (short) 5, Map.entry("k6", 
"v6"));
+            verify(coordinator).replay(6L, 100L, (short) 5, Map.entry("k7", 
"v7"));
             verify(coordinator).replayEndTransactionMarker(100L, (short) 5, 
TransactionResult.COMMIT);
             verify(coordinator).replayEndTransactionMarker(500L, (short) 10, 
TransactionResult.ABORT);
             verify(coordinator).updateLastWrittenOffset(2L);
@@ -211,10 +209,10 @@ class CoordinatorLoaderImplTest {
         UnifiedLog log = mock(UnifiedLog.class);
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(100L);
-        Deserializer<Tuple<String, String>> serde = new 
StringKeyValueDeserializer();
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        Deserializer<Map.Entry<String, String>> serde = new 
StringKeyValueDeserializer();
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -257,9 +255,9 @@ class CoordinatorLoaderImplTest {
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(2L);
         StringKeyValueDeserializer serde = 
mock(StringKeyValueDeserializer.class);
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -278,11 +276,11 @@ class CoordinatorLoaderImplTest {
 
             when(serde.deserialize(any(ByteBuffer.class), 
any(ByteBuffer.class)))
                     .thenThrow(new 
Deserializer.UnknownRecordTypeException((short) 1))
-                    .thenReturn(new Tuple<>("k2", "v2"));
+                    .thenReturn(Map.entry("k2", "v2"));
 
             loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
 
-            verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
+            verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
         }
     }
 
@@ -293,9 +291,9 @@ class CoordinatorLoaderImplTest {
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(2L);
         StringKeyValueDeserializer serde = 
mock(StringKeyValueDeserializer.class);
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -332,9 +330,9 @@ class CoordinatorLoaderImplTest {
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(10L);
         StringKeyValueDeserializer serde = 
mock(StringKeyValueDeserializer.class);
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -359,10 +357,10 @@ class CoordinatorLoaderImplTest {
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(5L);
         StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
         MockTime time = new MockTime();
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 time,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -407,9 +405,9 @@ class CoordinatorLoaderImplTest {
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(7L);
         StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -446,13 +444,13 @@ class CoordinatorLoaderImplTest {
 
             assertNotNull(loader.load(tp, coordinator).get(10, 
TimeUnit.SECONDS));
 
-            verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
-            verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
-            verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
-            verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
-            verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
-            verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6"));
-            verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7"));
+            verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
+            verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
+            verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
+            verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
+            verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
+            verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6"));
+            verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7"));
             verify(coordinator, times(0)).updateLastWrittenOffset(0L);
             verify(coordinator, times(1)).updateLastWrittenOffset(2L);
             verify(coordinator, times(1)).updateLastWrittenOffset(5L);
@@ -470,9 +468,9 @@ class CoordinatorLoaderImplTest {
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(0L);
         StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -496,9 +494,9 @@ class CoordinatorLoaderImplTest {
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(7L);
         StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
@@ -535,13 +533,13 @@ class CoordinatorLoaderImplTest {
 
             assertNotNull(loader.load(tp, coordinator).get(10, 
TimeUnit.SECONDS));
 
-            verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
-            verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
-            verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
-            verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
-            verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
-            verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6"));
-            verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7"));
+            verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
+            verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
+            verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
+            verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
+            verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
+            verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6"));
+            verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7"));
             verify(coordinator, times(0)).updateLastWrittenOffset(0L);
             verify(coordinator, times(0)).updateLastWrittenOffset(2L);
             verify(coordinator, times(0)).updateLastWrittenOffset(5L);
@@ -560,9 +558,9 @@ class CoordinatorLoaderImplTest {
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= mock(Function.class);
         StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
-        CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
-        try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
                 Time.SYSTEM,
                 partitionLogSupplier,
                 partitionLogEndOffsetSupplier,
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
index b8a75102b9c..db9fd4f9b9b 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
@@ -206,7 +206,7 @@ public final class TieredStorageTestBuilder {
                                                                  
RemoteFetchCount remoteFetchRequestCount) {
         TopicPartition topicPartition = new TopicPartition(topic, partition);
         assertTrue(partition >= 0, "Partition must be >= 0");
-        
assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().getCount() >= 0, 
"Expected fetch count from tiered storage must be >= 0");
+        assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().count() 
>= 0, "Expected fetch count from tiered storage must be >= 0");
         assertFalse(fetchables.containsKey(topicPartition), "Consume already 
in progress for " + topicPartition);
         fetchables.put(topicPartition, new FetchableSpec(fromBroker, 
remoteFetchRequestCount));
         return this;
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
index f7a83bfe1de..0288718b930 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
 import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
 import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.specs.FetchCountAndOp;
 import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
 import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
 
@@ -141,16 +142,16 @@ public final class ConsumeAction implements 
TieredStorageTestAction {
                     .orElse(events);
 
             RemoteFetchCount remoteFetchCount = 
remoteFetchSpec.remoteFetchCount();
-            RemoteFetchCount.FetchCountAndOp expectedCountAndOp = switch 
(eventType) {
+            FetchCountAndOp expectedCountAndOp = switch (eventType) {
                 case FETCH_SEGMENT -> 
remoteFetchCount.getSegmentFetchCountAndOp();
                 case FETCH_OFFSET_INDEX -> 
remoteFetchCount.getOffsetIdxFetchCountAndOp();
                 case FETCH_TIME_INDEX -> 
remoteFetchCount.getTimeIdxFetchCountAndOp();
                 case FETCH_TRANSACTION_INDEX -> 
remoteFetchCount.getTxnIdxFetchCountAndOp();
-                default -> new RemoteFetchCount.FetchCountAndOp(-1, 
RemoteFetchCount.OperationType.EQUALS_TO);
+                default -> new FetchCountAndOp(-1, 
RemoteFetchCount.OperationType.EQUALS_TO);
             };
 
-            RemoteFetchCount.OperationType exceptedOperationType = 
expectedCountAndOp.getOperationType();
-            int exceptedCount = expectedCountAndOp.getCount();
+            RemoteFetchCount.OperationType exceptedOperationType = 
expectedCountAndOp.operationType();
+            int exceptedCount = expectedCountAndOp.count();
             int actualCount = eventsInScope.size();
             String message = errorMessage(eventType, actualCount, 
exceptedOperationType, exceptedCount);
             if (exceptedCount != -1) {
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
index ba210722c9c..9f3cd9e3637 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
 import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
 import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.FetchCountAndOp;
 import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
 
@@ -28,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import static 
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.FetchCountAndOp;
 import static 
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO;
 
 /**
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchCountAndOp.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchCountAndOp.java
new file mode 100644
index 00000000000..5d0b8ff8323
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchCountAndOp.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tiered.storage.specs;
+
+public record FetchCountAndOp(
+    int count,
+    RemoteFetchCount.OperationType operationType
+) {
+    public FetchCountAndOp(int count) {
+        this(count, RemoteFetchCount.OperationType.EQUALS_TO);
+    }
+
+    @Override
+    public String toString() {
+        return "FetchCountAndOp{" +
+            "count=" + count +
+            ", operationType=" + operationType +
+            '}';
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
index 67a1d8e5b49..7f3b10c84b1 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
@@ -85,35 +85,4 @@ public class RemoteFetchCount {
         GREATER_THAN_OR_EQUALS_TO,
         LESS_THAN_OR_EQUALS_TO
     }
-
-    public static class FetchCountAndOp {
-        private final int count;
-        private final OperationType operationType;
-
-        public FetchCountAndOp(int count) {
-            this.count = count;
-            this.operationType = OperationType.EQUALS_TO;
-        }
-
-        public FetchCountAndOp(int count, OperationType operationType) {
-            this.count = count;
-            this.operationType = operationType;
-        }
-
-        public int getCount() {
-            return count;
-        }
-
-        public OperationType getOperationType() {
-            return operationType;
-        }
-
-        @Override
-        public String toString() {
-            return "FetchCountAndOp{" +
-                    "count=" + count +
-                    ", operationType=" + operationType +
-                    '}';
-        }
-    }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java
index 505f3c4dd7c..68cf4178eea 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
 
@@ -67,15 +68,15 @@ public final class LocalTieredStorageOutput<K, V> 
implements LocalTieredStorageT
             if (records.isEmpty()) {
                 output += row(segFilename, -1, "");
             } else {
-                List<Tuple2<Long, String>> offsetKeyValues = records
+                List<Map.Entry<Long, String>> offsetKeyValues = records
                         .stream()
-                        .map(record -> new Tuple2<>(record.offset(),
+                        .map(record -> Map.entry(record.offset(),
                                 "(" + des(keyDe, record.key()) + ", " + 
des(valueDe, record.value()) + ")"))
                         .toList();
-                output += row(segFilename, offsetKeyValues.get(0).t1, 
offsetKeyValues.get(0).t2);
+                output += row(segFilename, offsetKeyValues.get(0).getKey(), 
offsetKeyValues.get(0).getValue());
                 if (offsetKeyValues.size() > 1) {
                     offsetKeyValues.subList(1, 
records.size()).forEach(offsetKeyValue ->
-                            output += row("", offsetKeyValue.t1, 
offsetKeyValue.t2));
+                            output += row("", offsetKeyValue.getKey(), 
offsetKeyValue.getValue()));
                 }
             }
             output += row();
@@ -91,7 +92,4 @@ public final class LocalTieredStorageOutput<K, V> implements 
LocalTieredStorageT
     private String des(Deserializer<?> de, ByteBuffer bytes) {
         return de.deserialize(currentTopic, 
Utils.toNullableArray(bytes)).toString();
     }
-
-    private record Tuple2<T1, T2>(T1 t1, T2 t2) {
-    }
 }

Reply via email to