lucasbru commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2929765220
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -285,9 +285,11 @@ class KafkaApis(val requestChannel: RequestChannel,
if (useTopicIds) {
offsetCommitRequest.data.topics.forEach { topic =>
- if (topic.topicId != Uuid.ZERO_UUID) {
- metadataCache.getTopicName(topic.topicId).ifPresent(name =>
topic.setName(name))
- }
+ metadataCache.getTopicName(topic.topicId).ifPresent(name =>
topic.setName(name))
+ }
+ } else {
+ offsetCommitRequest.data.topics.forEach { topic =>
+ topic.setTopicId(metadataCache.getTopicId(topic.name))
Review Comment:
Do we not need to do the same for transactional Offset commits?
Also, this look-up may fail (if the topic was deleted), so we would still
have to handle the missing topic ID later on. I'd just fail the assingment
epoch validation if the topic ID is missing (ZERO_UUID). I suppose there is
also a discussion to throw an error in this case, but it seems that would
belong in a different PR. cc @dajac
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -1081,6 +1081,7 @@ private void cancelGroupSizeCounter() {
public void onLoaded(CoordinatorMetadataImage newImage) {
CoordinatorMetadataDelta emptyDelta = newImage.emptyDelta();
groupMetadataManager.onMetadataUpdate(emptyDelta, newImage);
+ offsetMetadataManager.onMetadataUpdate(emptyDelta, newImage);
Review Comment:
Seems this can be removed
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
Review Comment:
Here we are always passing ZERO_UUID.
Doesn't that mean that the offset commit resolution in KafkaApis doesn't
help?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -673,8 +673,26 @@ public CommitPartitionValidator validateOffsetCommit(
"by members using the modern group protocol");
Review Comment:
side not "the modern group protocol" isn't really a thing. I think we may
want to change this to consumer group protocol. Not your code, but maybe we can
fix it on the side.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -46,9 +46,9 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
Review Comment:
It seems to me all the changes in this file can be reverted.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,14 +940,181 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
Review Comment:
The code handles classic members differntly (falling through to old commit
logic). Can we add a test for that?
Also, a test for the case memberEpoch > broker-side memberEpoch
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -84,7 +85,7 @@ public static class Builder {
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private GroupMetadataManager groupMetadataManager = null;
- private MetadataImage metadataImage = null;
+ private CoordinatorMetadataImage metadataImage = null;
Review Comment:
Why do we add a metadataImage and do not use it?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -285,9 +285,11 @@ class KafkaApis(val requestChannel: RequestChannel,
if (useTopicIds) {
offsetCommitRequest.data.topics.forEach { topic =>
- if (topic.topicId != Uuid.ZERO_UUID) {
Review Comment:
We have removed this check. I'm not sure why we had it (can topic id be
ZERO_UUID when we use topic IDs), but maybe we should keep it if we are not
sure?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]