This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d0a9a04a020 MINOR: Cleanups in storage module (#20270)
d0a9a04a020 is described below
commit d0a9a04a0202f2ce6354fd72488aab665d046ee7
Author: Lan Ding <[email protected]>
AuthorDate: Thu Jul 31 20:55:49 2025 +0800
MINOR: Cleanups in storage module (#20270)
Cleanups including:
- Rewrite `FetchCountAndOp` as a record class
- Replace `Tuple` by `Map.Entry`
Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../common/runtime/CoordinatorLoaderImplTest.java | 112 ++++++++++-----------
.../tiered/storage/TieredStorageTestBuilder.java | 2 +-
.../tiered/storage/actions/ConsumeAction.java | 9 +-
.../OffloadAndTxnConsumeFromLeaderTest.java | 2 +-
.../tiered/storage/specs/FetchCountAndOp.java | 35 +++++++
.../tiered/storage/specs/RemoteFetchCount.java | 31 ------
.../storage/utils/LocalTieredStorageOutput.java | 12 +--
7 files changed, 102 insertions(+), 101 deletions(-)
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
index 0113fbd657d..8760e9347a1 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
@@ -43,6 +43,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -66,14 +67,11 @@ import static org.mockito.Mockito.when;
@Timeout(60)
class CoordinatorLoaderImplTest {
- private record Tuple<K, V>(K key, V value) {
- }
-
- private static class StringKeyValueDeserializer implements
Deserializer<Tuple<String, String>> {
+ private static class StringKeyValueDeserializer implements
Deserializer<Map.Entry<String, String>> {
@Override
- public Tuple<String, String> deserialize(ByteBuffer key, ByteBuffer
value) throws RuntimeException {
- return new Tuple<>(
+ public Map.Entry<String, String> deserialize(ByteBuffer key,
ByteBuffer value) throws RuntimeException {
+ return Map.entry(
StandardCharsets.UTF_8.decode(key).toString(),
StandardCharsets.UTF_8.decode(value).toString()
);
@@ -85,10 +83,10 @@ class CoordinatorLoaderImplTest {
TopicPartition tp = new TopicPartition("foo", 0);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.empty();
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.empty();
- Deserializer<Tuple<String, String>> serde = mock(Deserializer.class);
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ Deserializer<Map.Entry<String, String>> serde =
mock(Deserializer.class);
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -104,10 +102,10 @@ class CoordinatorLoaderImplTest {
TopicPartition tp = new TopicPartition("foo", 0);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(mock(UnifiedLog.class));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.empty();
- Deserializer<Tuple<String, String>> serde = mock(Deserializer.class);
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ Deserializer<Map.Entry<String, String>> serde =
mock(Deserializer.class);
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -125,10 +123,10 @@ class CoordinatorLoaderImplTest {
UnifiedLog log = mock(UnifiedLog.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.of(9L);
- Deserializer<Tuple<String, String>> serde = new
StringKeyValueDeserializer();
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ Deserializer<Map.Entry<String, String>> serde = new
StringKeyValueDeserializer();
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -188,13 +186,13 @@ class CoordinatorLoaderImplTest {
// Includes 7 normal + 2 control (COMMIT, ABORT)
assertEquals(9, summary.numRecords());
- verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
- verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
- verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
- verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
- verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
- verify(coordinator).replay(5L, 100L, (short) 5, new Tuple<>("k6",
"v6"));
- verify(coordinator).replay(6L, 100L, (short) 5, new Tuple<>("k7",
"v7"));
+ verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
+ verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
+ verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
+ verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
+ verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
+ verify(coordinator).replay(5L, 100L, (short) 5, Map.entry("k6",
"v6"));
+ verify(coordinator).replay(6L, 100L, (short) 5, Map.entry("k7",
"v7"));
verify(coordinator).replayEndTransactionMarker(100L, (short) 5,
TransactionResult.COMMIT);
verify(coordinator).replayEndTransactionMarker(500L, (short) 10,
TransactionResult.ABORT);
verify(coordinator).updateLastWrittenOffset(2L);
@@ -211,10 +209,10 @@ class CoordinatorLoaderImplTest {
UnifiedLog log = mock(UnifiedLog.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.of(100L);
- Deserializer<Tuple<String, String>> serde = new
StringKeyValueDeserializer();
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ Deserializer<Map.Entry<String, String>> serde = new
StringKeyValueDeserializer();
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -257,9 +255,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.of(2L);
StringKeyValueDeserializer serde =
mock(StringKeyValueDeserializer.class);
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -278,11 +276,11 @@ class CoordinatorLoaderImplTest {
when(serde.deserialize(any(ByteBuffer.class),
any(ByteBuffer.class)))
.thenThrow(new
Deserializer.UnknownRecordTypeException((short) 1))
- .thenReturn(new Tuple<>("k2", "v2"));
+ .thenReturn(Map.entry("k2", "v2"));
loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
- verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
+ verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
}
}
@@ -293,9 +291,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.of(2L);
StringKeyValueDeserializer serde =
mock(StringKeyValueDeserializer.class);
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -332,9 +330,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.of(10L);
StringKeyValueDeserializer serde =
mock(StringKeyValueDeserializer.class);
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -359,10 +357,10 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.of(5L);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
MockTime time = new MockTime();
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
time,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -407,9 +405,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.of(7L);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -446,13 +444,13 @@ class CoordinatorLoaderImplTest {
assertNotNull(loader.load(tp, coordinator).get(10,
TimeUnit.SECONDS));
- verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
- verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
- verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
- verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
- verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
- verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6"));
- verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7"));
+ verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
+ verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
+ verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
+ verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
+ verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
+ verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6"));
+ verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7"));
verify(coordinator, times(0)).updateLastWrittenOffset(0L);
verify(coordinator, times(1)).updateLastWrittenOffset(2L);
verify(coordinator, times(1)).updateLastWrittenOffset(5L);
@@ -470,9 +468,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.of(0L);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -496,9 +494,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= partition -> Optional.of(7L);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
@@ -535,13 +533,13 @@ class CoordinatorLoaderImplTest {
assertNotNull(loader.load(tp, coordinator).get(10,
TimeUnit.SECONDS));
- verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
- verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
- verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
- verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
- verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
- verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6"));
- verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7"));
+ verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
+ verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
+ verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
+ verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
+ verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
+ verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6"));
+ verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7"));
verify(coordinator, times(0)).updateLastWrittenOffset(0L);
verify(coordinator, times(0)).updateLastWrittenOffset(2L);
verify(coordinator, times(0)).updateLastWrittenOffset(5L);
@@ -560,9 +558,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier =
partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier
= mock(Function.class);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
- CoordinatorPlayback<Tuple<String, String>> coordinator =
mock(CoordinatorPlayback.class);
+ CoordinatorPlayback<Map.Entry<String, String>> coordinator =
mock(CoordinatorPlayback.class);
- try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new
CoordinatorLoaderImpl<>(
+ try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new
CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
index b8a75102b9c..db9fd4f9b9b 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
@@ -206,7 +206,7 @@ public final class TieredStorageTestBuilder {
RemoteFetchCount remoteFetchRequestCount) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
assertTrue(partition >= 0, "Partition must be >= 0");
-
assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().getCount() >= 0,
"Expected fetch count from tiered storage must be >= 0");
+ assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().count()
>= 0, "Expected fetch count from tiered storage must be >= 0");
assertFalse(fetchables.containsKey(topicPartition), "Consume already
in progress for " + topicPartition);
fetchables.put(topicPartition, new FetchableSpec(fromBroker,
remoteFetchRequestCount));
return this;
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
index f7a83bfe1de..0288718b930 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.specs.FetchCountAndOp;
import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
@@ -141,16 +142,16 @@ public final class ConsumeAction implements
TieredStorageTestAction {
.orElse(events);
RemoteFetchCount remoteFetchCount =
remoteFetchSpec.remoteFetchCount();
- RemoteFetchCount.FetchCountAndOp expectedCountAndOp = switch
(eventType) {
+ FetchCountAndOp expectedCountAndOp = switch (eventType) {
case FETCH_SEGMENT ->
remoteFetchCount.getSegmentFetchCountAndOp();
case FETCH_OFFSET_INDEX ->
remoteFetchCount.getOffsetIdxFetchCountAndOp();
case FETCH_TIME_INDEX ->
remoteFetchCount.getTimeIdxFetchCountAndOp();
case FETCH_TRANSACTION_INDEX ->
remoteFetchCount.getTxnIdxFetchCountAndOp();
- default -> new RemoteFetchCount.FetchCountAndOp(-1,
RemoteFetchCount.OperationType.EQUALS_TO);
+ default -> new FetchCountAndOp(-1,
RemoteFetchCount.OperationType.EQUALS_TO);
};
- RemoteFetchCount.OperationType exceptedOperationType =
expectedCountAndOp.getOperationType();
- int exceptedCount = expectedCountAndOp.getCount();
+ RemoteFetchCount.OperationType exceptedOperationType =
expectedCountAndOp.operationType();
+ int exceptedCount = expectedCountAndOp.count();
int actualCount = eventsInScope.size();
String message = errorMessage(eventType, actualCount,
exceptedOperationType, exceptedCount);
if (exceptedCount != -1) {
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
index ba210722c9c..9f3cd9e3637 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.FetchCountAndOp;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
@@ -28,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import static
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.FetchCountAndOp;
import static
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO;
/**
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchCountAndOp.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchCountAndOp.java
new file mode 100644
index 00000000000..5d0b8ff8323
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchCountAndOp.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tiered.storage.specs;
+
+public record FetchCountAndOp(
+ int count,
+ RemoteFetchCount.OperationType operationType
+) {
+ public FetchCountAndOp(int count) {
+ this(count, RemoteFetchCount.OperationType.EQUALS_TO);
+ }
+
+ @Override
+ public String toString() {
+ return "FetchCountAndOp{" +
+ "count=" + count +
+ ", operationType=" + operationType +
+ '}';
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
index 67a1d8e5b49..7f3b10c84b1 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
@@ -85,35 +85,4 @@ public class RemoteFetchCount {
GREATER_THAN_OR_EQUALS_TO,
LESS_THAN_OR_EQUALS_TO
}
-
- public static class FetchCountAndOp {
- private final int count;
- private final OperationType operationType;
-
- public FetchCountAndOp(int count) {
- this.count = count;
- this.operationType = OperationType.EQUALS_TO;
- }
-
- public FetchCountAndOp(int count, OperationType operationType) {
- this.count = count;
- this.operationType = operationType;
- }
-
- public int getCount() {
- return count;
- }
-
- public OperationType getOperationType() {
- return operationType;
- }
-
- @Override
- public String toString() {
- return "FetchCountAndOp{" +
- "count=" + count +
- ", operationType=" + operationType +
- '}';
- }
- }
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java
index 505f3c4dd7c..68cf4178eea 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import static
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
@@ -67,15 +68,15 @@ public final class LocalTieredStorageOutput<K, V>
implements LocalTieredStorageT
if (records.isEmpty()) {
output += row(segFilename, -1, "");
} else {
- List<Tuple2<Long, String>> offsetKeyValues = records
+ List<Map.Entry<Long, String>> offsetKeyValues = records
.stream()
- .map(record -> new Tuple2<>(record.offset(),
+ .map(record -> Map.entry(record.offset(),
"(" + des(keyDe, record.key()) + ", " +
des(valueDe, record.value()) + ")"))
.toList();
- output += row(segFilename, offsetKeyValues.get(0).t1,
offsetKeyValues.get(0).t2);
+ output += row(segFilename, offsetKeyValues.get(0).getKey(),
offsetKeyValues.get(0).getValue());
if (offsetKeyValues.size() > 1) {
offsetKeyValues.subList(1,
records.size()).forEach(offsetKeyValue ->
- output += row("", offsetKeyValue.t1,
offsetKeyValue.t2));
+ output += row("", offsetKeyValue.getKey(),
offsetKeyValue.getValue()));
}
}
output += row();
@@ -91,7 +92,4 @@ public final class LocalTieredStorageOutput<K, V> implements
LocalTieredStorageT
private String des(Deserializer<?> de, ByteBuffer bytes) {
return de.deserialize(currentTopic,
Utils.toNullableArray(bytes)).toString();
}
-
- private record Tuple2<T1, T2>(T1 t1, T2 t2) {
- }
}