chickenchickenlove commented on code in PR #21126:
URL: https://github.com/apache/kafka/pull/21126#discussion_r2652475533
##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java:
##########
@@ -70,6 +84,24 @@ public OffsetsForLeaderEpochRequest build(short version) {
if (version < oldestAllowedVersion() || version >
latestAllowedVersion())
throw new UnsupportedVersionException("Cannot build " + this +
" with version " + version);
+ if (version <= 4) {
+ for (OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
topic : data.topics()) {
+ if (topic.topic() == null || topic.topic().isEmpty()) {
+ throw new UnsupportedVersionException("The broker
offsets for leader api version " +
+ version + " does require usage of topic
names.");
+ }
+ }
+ }
+
+ if (version >= 5) {
+ for (OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
topic : data.topics()) {
+ if (topic.topicId() == null || topic.topicId() ==
Uuid.ZERO_UUID) {
Review Comment:
The parsed UUID object in the API response might not share the same
reference as ZERO_UUID. I wonder if comparing them using `equals()` instead of
`==` was intended here. What do you think?
For example,
```java
if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) { ...
}
```
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2117,13 +2117,29 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
val topics = offsetForLeaderEpoch.data.topics.asScala.toSeq
+ // Separate topics with unknown topic IDs when using version 5+
+ val (knownTopics, unknownTopicIdTopics) = if
(OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
+ topics.partition { offsetForLeaderTopic =>
+ metadataCache.getTopicName(offsetForLeaderTopic.topicId).isPresent
+ }
+ } else {
+ (topics, Seq.empty[OffsetForLeaderTopic])
+ }
+
// The OffsetsForLeaderEpoch API was initially only used for inter-broker
communication and required
// cluster permission. With KIP-320, the consumer now also uses this API
to check for log truncation
// following a leader change, so we also allow topic describe permission.
val (authorizedTopics, unauthorizedTopics) =
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME, logIfDenied = false))
- (topics, Seq.empty[OffsetForLeaderTopic])
- else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE,
TOPIC, topics)(_.topic)
+ (knownTopics, Seq.empty[OffsetForLeaderTopic])
+ else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE,
TOPIC, knownTopics) { offsetForLeaderTopic =>
+ // Resolve topic name from topicId if needed for authorization
+ if
(OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
+ metadataCache.getTopicName(offsetForLeaderTopic.topicId).get()
Review Comment:
Should we consider race condition in terms of view update here?
Because we check for existence first and retrieve the value later.
Consider the following sequence:
1. Thread A: Executes the partitioning logic.
`metadataCache.getTopicName(id).isPresent()` returns true for UUID-123, so it's
added to knownTopics.
2. Thread B: Handles a metadata update (e.g., topic deletion via Admin API).
It removes UUID-123 from `metadataCache`.
3. Thread A: Proceeds to the authorization block and calls
`metadataCache.getTopicName(id).get()`.
As a result of this operation, `NoSuchElementException` will be thrown
because the topic no longer exists in the cache, leading to an uncaught
exception.
##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java:
##########
@@ -51,10 +52,23 @@ public static Builder
forConsumer(OffsetForLeaderTopicCollection epochsByPartiti
// to clients. Beginning with version 3, the broker requires only
TOPIC Describe
// permission for the topic of each requested partition. In order
to ensure client
// compatibility, we only send this request when we can guarantee
the relaxed permissions.
+
+ // Check if all topics have topic IDs. If so, we can use version 5
which requires topic IDs.
+ // Otherwise, use version 4 which uses topic names.
+ boolean canUseTopicId = true;
+ for (OffsetForLeaderEpochRequestData.OffsetForLeaderTopic topic :
epochsByPartition) {
+ if (topic.topicId() == null ||
topic.topicId().equals(Uuid.ZERO_UUID)) {
+ canUseTopicId = false;
+ break;
+ }
+ }
+
OffsetForLeaderEpochRequestData data = new
OffsetForLeaderEpochRequestData();
data.setReplicaId(CONSUMER_REPLICA_ID);
data.setTopics(epochsByPartition);
- return new Builder((short) 3,
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
+
+ short latestVersion = canUseTopicId ? (short) 5 : (short) 4;
+ return new Builder((short) 3, latestVersion, data);
Review Comment:
`ApiKeys.OFFSET_FOR_LEADER_EPOCH` could be updated to treat `v5` as the
unstable latest version. That way, we can reference the version registered in
the enum instead of hard-coding magic numbers like 4/5 here.
For example, we can use
``` java
if (canUseTopicId) {
return new Builder((short) 3,
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
} else {
return new Builder((short) 3,
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(true), data);
}
```
If bumping `latestVersion` is too impactful, would it make sense to follow
any existing pattern for retrieving the unstable latest version and use that
instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]