chia7712 commented on code in PR #17444:
URL: https://github.com/apache/kafka/pull/17444#discussion_r1813529565
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java:
##########
@@ -98,6 +122,7 @@ public int hashCode() {
int result = id.hashCode();
result = 31 * result + name.hashCode();
result = 31 * result + numPartitions;
+ result = 31 * result + partitionRacks.hashCode();
Review Comment:
we should update `equals` to align the change of `hashCode`, right?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##########
@@ -388,10 +386,25 @@ public Map<String, TopicMetadata>
computeSubscriptionMetadata(
subscribedTopicNames.forEach((topicName, count) -> {
TopicImage topicImage = topicsImage.getTopic(topicName);
if (topicImage != null) {
+ Map<Integer, Set<String>> partitionRacks = new HashMap<>();
+ topicImage.partitions().forEach((partition,
partitionRegistration) -> {
+ Set<String> racks = new HashSet<>();
+ for (int replica : partitionRegistration.replicas) {
+ Optional<String> rackOptional =
clusterImage.broker(replica).rack();
Review Comment:
On another note, moving a partition can trigger a rack change, correct? If
so, could this cause issues for users who have deployed an auto-rebalance tool?
It might lead to a series of rebalances.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##########
@@ -388,10 +386,25 @@ public Map<String, TopicMetadata>
computeSubscriptionMetadata(
subscribedTopicNames.forEach((topicName, count) -> {
TopicImage topicImage = topicsImage.getTopic(topicName);
if (topicImage != null) {
+ Map<Integer, Set<String>> partitionRacks = new HashMap<>();
+ topicImage.partitions().forEach((partition,
partitionRegistration) -> {
+ Set<String> racks = new HashSet<>();
+ for (int replica : partitionRegistration.replicas) {
+ Optional<String> rackOptional =
clusterImage.broker(replica).rack();
Review Comment:
This seems more like an optimization rather than a bug fix, correct? There
isn't any data loss even if we don't trigger a rebalance. Also, this rebalance
event isn't mentioned in
[KIP-848](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-GroupEpoch-Triggerarebalance).
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1815,18 +1814,18 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
numMembers
);
- if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ long subscriptionMetadataHash =
Utils.hashSubscriptionMetadata(subscriptionMetadata);
+ if (subscriptionMetadataHash !=
group.subscribedTopicMetadataHash()) {
Review Comment:
Should we align the naming? `subscriptionMetadataHash` or
`subscribedTopicMetadataHash`
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##########
@@ -388,10 +386,25 @@ public Map<String, TopicMetadata>
computeSubscriptionMetadata(
subscribedTopicNames.forEach((topicName, count) -> {
TopicImage topicImage = topicsImage.getTopic(topicName);
if (topicImage != null) {
+ Map<Integer, Set<String>> partitionRacks = new HashMap<>();
Review Comment:
Maybe we can initialize size to `topicImage.partitions().size()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]