dajac commented on code in PR #19856:
URL: https://github.com/apache/kafka/pull/19856#discussion_r2121092739
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -142,7 +142,7 @@ private ConsumerMembershipManager
createMembershipManagerJoiningGroup(String gro
private ConsumerMembershipManager createMembershipManager(String
groupInstanceId) {
Review Comment:
Let's extend unit tests to cover the rack field.
##########
clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java:
##########
@@ -73,6 +74,16 @@ public GroupRebalanceConfig(AbstractConfig config,
ProtocolType protocolType) {
this.groupInstanceId = Optional.empty();
}
+ // The WorkerGroupMember in connect module also uses this class, but
there is no client.rack in DistributedConfig.
+ // Ignore the rackId in that case to avoid ConfigException.
+ // The GroupCoordinatorService throws error if the rackId is empty.
The default value of client.rack is empty string.
+ // Skip empty rackId to avoid InvalidRequestException.
+ if
(config.values().containsKey(CommonClientConfigs.CLIENT_RACK_CONFIG) &&
!config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG).isEmpty()) {
+ this.rackId =
Optional.ofNullable(config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG));
+ } else {
+ this.rackId = Optional.empty();
+ }
Review Comment:
- Should we only do this if the protocolType is CONSUMER?
- It may be simpler to get the config in a local variable and to set
this.rack if non-null and non-empty;
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java:
##########
@@ -262,6 +262,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData()
{
// InstanceId - set if present
membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
+ // RackId - set if present
+ membershipManager.rackId().ifPresent(data::setRackId);
Review Comment:
I think that it should only be set if it did not change since the last
request or if we send a full request.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java:
##########
@@ -216,6 +222,92 @@ public void testLeaderEpoch(ClusterInstance
clusterInstance) throws Exception {
}
}
+ @ClusterTest(
Review Comment:
I wonder whether we could add another test which verifies that a rebalance
is triggered when the racks of a partition has changed. Have you considered it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]