This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 32c2383bfad KAFKA-19658 Tweak
org.apache.kafka.clients.consumer.OffsetAndMetadata (#20451)
32c2383bfad is described below
commit 32c2383bfad0eddc7d10fd3927981673bbaa1b41
Author: Lan Ding <[email protected]>
AuthorDate: Fri Sep 5 06:06:08 2025 +0800
KAFKA-19658 Tweak org.apache.kafka.clients.consumer.OffsetAndMetadata
(#20451)
1. Optimize the `equals()`, `hashCode()`, and `toString()` methods in
`OffsetAndMetadata`.
2. Add UT and IT to these modifications.
Reviewers: TengYao Chi <[email protected]>, Sean Quah
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/consumer/PlaintextConsumerTest.java | 13 +++++++++----
.../kafka/clients/consumer/OffsetAndMetadata.java | 17 +++++++++--------
.../kafka/clients/consumer/OffsetAndMetadataTest.java | 15 +++++++++++++++
.../consumer/internals/CommitRequestManagerTest.java | 3 +++
4 files changed, 36 insertions(+), 12 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index 5fd2ad20089..c69c9c35fd4 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -553,14 +553,19 @@ public class PlaintextConsumerTest {
// commit sync and verify onCommit is called
var commitCountBefore =
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
- consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L)));
- assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset());
+ consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L,
"metadata")));
+ OffsetAndMetadata metadata =
consumer.committed(Set.of(TP)).get(TP);
+ assertEquals(2, metadata.offset());
+ assertEquals("metadata", metadata.metadata());
assertEquals(commitCountBefore + 1,
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
// commit async and verify onCommit is called
- var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L));
+ var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L, null));
sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit));
- assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset());
+ metadata = consumer.committed(Set.of(TP)).get(TP);
+ assertEquals(5, metadata.offset());
+ // null metadata will be converted to an empty string
+ assertEquals("", metadata.metadata());
assertEquals(commitCountBefore + 2,
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
}
// cleanup
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index d6b3b947c20..f459dd5ba55 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -54,10 +54,7 @@ public class OffsetAndMetadata implements Serializable {
// The server converts null metadata to an empty string. So we store
it as an empty string as well on the client
// to be consistent.
- if (metadata == null)
- this.metadata = OffsetFetchResponse.NO_METADATA;
- else
- this.metadata = metadata;
+ this.metadata = Objects.requireNonNullElse(metadata,
OffsetFetchResponse.NO_METADATA);
}
/**
@@ -82,6 +79,11 @@ public class OffsetAndMetadata implements Serializable {
return offset;
}
+ /**
+ * Get the metadata of the previously consumed record.
+ *
+ * @return the metadata or empty string if no metadata
+ */
public String metadata() {
return metadata;
}
@@ -106,21 +108,20 @@ public class OffsetAndMetadata implements Serializable {
OffsetAndMetadata that = (OffsetAndMetadata) o;
return offset == that.offset &&
Objects.equals(metadata, that.metadata) &&
- Objects.equals(leaderEpoch, that.leaderEpoch);
+ Objects.equals(leaderEpoch(), that.leaderEpoch());
}
@Override
public int hashCode() {
- return Objects.hash(offset, metadata, leaderEpoch);
+ return Objects.hash(offset, metadata, leaderEpoch());
}
@Override
public String toString() {
return "OffsetAndMetadata{" +
"offset=" + offset +
- ", leaderEpoch=" + leaderEpoch +
+ ", leaderEpoch=" + leaderEpoch().orElse(null) +
", metadata='" + metadata + '\'' +
'}';
}
-
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java
index 3035703ff37..c1a13c054ee 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java
@@ -65,4 +65,19 @@ public class OffsetAndMetadataTest {
assertEquals(new OffsetAndMetadata(10, Optional.of(235), "test commit
metadata"), deserializedObject);
}
+ @Test
+ public void testEqualsWithNullAndNegativeLeaderEpoch() {
+ OffsetAndMetadata metadataWithNullEpoch = new OffsetAndMetadata(100L,
Optional.empty(), "metadata");
+ OffsetAndMetadata metadataWithNegativeEpoch = new
OffsetAndMetadata(100L, Optional.of(-1), "metadata");
+ assertEquals(metadataWithNullEpoch, metadataWithNegativeEpoch);
+ assertEquals(metadataWithNullEpoch.hashCode(),
metadataWithNegativeEpoch.hashCode());
+ }
+
+ @Test
+ public void testEqualsWithNullAndEmptyMetadata() {
+ OffsetAndMetadata metadataWithNullMetadata = new
OffsetAndMetadata(100L, Optional.of(1), null);
+ OffsetAndMetadata metadataWithEmptyMetadata = new
OffsetAndMetadata(100L, Optional.of(1), "");
+ assertEquals(metadataWithNullMetadata, metadataWithEmptyMetadata);
+ assertEquals(metadataWithNullMetadata.hashCode(),
metadataWithEmptyMetadata.hashCode());
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index d4ceeedde56..7032c13b285 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -766,6 +766,7 @@ public class CommitRequestManagerTest {
// Complete request with a response
long expectedOffset = 100;
+ String expectedMetadata = "metadata";
NetworkClientDelegate.UnsentRequest req = result.unsentRequests.get(0);
OffsetFetchResponseData.OffsetFetchResponseGroup groupResponse = new
OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(DEFAULT_GROUP_ID)
@@ -777,6 +778,7 @@ public class CommitRequestManagerTest {
.setPartitionIndex(tp.partition())
.setCommittedOffset(expectedOffset)
.setCommittedLeaderEpoch(1)
+ .setMetadata(expectedMetadata)
))
));
req.handler().onComplete(buildOffsetFetchClientResponse(req,
groupResponse, false));
@@ -794,6 +796,7 @@ public class CommitRequestManagerTest {
assertEquals(1, offsetsAndMetadata.size());
assertTrue(offsetsAndMetadata.containsKey(tp));
assertEquals(expectedOffset, offsetsAndMetadata.get(tp).offset());
+ assertEquals(expectedMetadata, offsetsAndMetadata.get(tp).metadata());
assertEquals(0,
commitManager.pendingRequests.inflightOffsetFetches.size(), "Inflight " +
"request should be removed from the queue when a response is
received.");
}