cadonna commented on code in PR #18827:
URL: https://github.com/apache/kafka/pull/18827#discussion_r1946152012
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -352,6 +370,14 @@ void testSuccessfulResponse() {
assertEquals(data.activeTasks(), response.activeTasks());
assertEquals(data.standbyTasks(), response.standbyTasks());
assertEquals(data.warmupTasks(), response.warmupTasks());
+
+ assertEquals(data.partitionsByUserEndpoint(),
response.partitionsByUserEndpoint());
+ Map<StreamsAssignmentInterface.HostInfo,
StreamsAssignmentInterface.EndpointPartitions> endpointPartitionsMap =
streamsAssignmentInterface.partitionsByHost.get();
+ assertEquals(endpointPartitionsMap.size(),
response.partitionsByUserEndpoint().size());
+ StreamsAssignmentInterface.HostInfo hostInfo =
endpointPartitionsMap.keySet().iterator().next();
+ assertEquals(hostInfo.host, endpoint.host());
+ assertEquals(hostInfo.port, endpoint.port());
+ StreamsAssignmentInterface.EndpointPartitions endpointPartitions =
endpointPartitionsMap.get(hostInfo);
Review Comment:
This line seems a remainder of something. `endpointPartitions` is not used,
is it?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java:
##########
@@ -143,6 +143,27 @@ public String toString() {
}
+ public static class EndpointPartitions {
+ public final List<TopicPartition> activePartitions;
+ public final List<TopicPartition> standbyPartitions;
+
+ public EndpointPartitions(final List<TopicPartition> activePartitions,
+ final List<TopicPartition>
standbyPartitions) {
+ this.activePartitions = activePartitions;
+ this.standbyPartitions = standbyPartitions;
+ }
+
+ @Override
+ public String toString() {
+ return "EndpointPartitions {"
+ + "activePartitions=" + activePartitions
+ + ", standbyPartitions=" + standbyPartitions
+ + '}';
+ }
+ }
Review Comment:
On `trunk` we specified this kind of classes with private fields and public
getters. Could you also specify them here this way so that we do not need
rewrite this code when we port it to `trunk`?
I know that we said conceptual reviews on dev and then more detailed reviews
on `trunk` but this is a rather small change that saves us some work.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -352,6 +370,14 @@ void testSuccessfulResponse() {
assertEquals(data.activeTasks(), response.activeTasks());
assertEquals(data.standbyTasks(), response.standbyTasks());
assertEquals(data.warmupTasks(), response.warmupTasks());
+
+ assertEquals(data.partitionsByUserEndpoint(),
response.partitionsByUserEndpoint());
+ Map<StreamsAssignmentInterface.HostInfo,
StreamsAssignmentInterface.EndpointPartitions> endpointPartitionsMap =
streamsAssignmentInterface.partitionsByHost.get();
+ assertEquals(endpointPartitionsMap.size(),
response.partitionsByUserEndpoint().size());
+ StreamsAssignmentInterface.HostInfo hostInfo =
endpointPartitionsMap.keySet().iterator().next();
+ assertEquals(hostInfo.host, endpoint.host());
+ assertEquals(hostInfo.port, endpoint.port());
Review Comment:
The parameters here should be the other way around. The expected value comes
first and then the tested value.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2378,6 +2380,27 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
return new CoordinatorResult<>(records, new
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
}
+ private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
maybeBuildEndpointToPartitions(StreamsGroup group) {
+ List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList = new ArrayList<>();
+ EndpointToPartitionsManager endpointToPartitionsManager = new
EndpointToPartitionsManager();
+ // Build the endpoint to topic partition information
Review Comment:
I see with pleasure that my continuous complaining is yielding results 😈
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]