dajac commented on code in PR #18141:
URL: https://github.com/apache/kafka/pull/18141#discussion_r1880819104
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -820,7 +869,11 @@ TreeMap<String, Entry<Optional<GroupState>,
Optional<Collection<MemberAssignment
consumer.clientId(),
consumer.groupInstanceId().orElse(""),
consumer.assignment().topicPartitions().size(),
- new ArrayList<>(verbose ?
consumer.assignment().topicPartitions() : Collections.emptySet())
+ new ArrayList<>(verbose ?
consumer.assignment().topicPartitions() : Collections.emptySet()),
Review Comment:
Do we really need to take `verbose` into consideration here? It is a bit
weird to consider it here but not for the others.
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##########
@@ -536,13 +703,15 @@ public void
testDescribeWithConsumersWithoutAssignedPartitions(ClusterInstance c
List<String> cgcArgs = new
ArrayList<>(Arrays.asList("--bootstrap-server",
clusterInstance.bootstrapServers(), "--describe", "--group", group));
cgcArgs.addAll(describeType);
// run two consumers in the group consuming from a
single-partition topic
- try (AutoCloseable protocolConsumerGroupExecutor =
consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2);
+ try (AutoCloseable protocolConsumerGroupExecutor =
consumerGroupClosable(groupProtocol, group, Set.of(topic),
Collections.emptyMap(), 2);
Review Comment:
nit: It looks like that there is an overload which accepts `topic`. Hence we
don't need to change all of them to using `Set.of(topic)`.
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -820,7 +869,11 @@ TreeMap<String, Entry<Optional<GroupState>,
Optional<Collection<MemberAssignment
consumer.clientId(),
consumer.groupInstanceId().orElse(""),
consumer.assignment().topicPartitions().size(),
- new ArrayList<>(verbose ?
consumer.assignment().topicPartitions() : Collections.emptySet())
+ new ArrayList<>(verbose ?
consumer.assignment().topicPartitions() : Collections.emptySet()),
+ consumer.targetAssignment().isEmpty() ? List.of() :
new ArrayList<>(consumer.targetAssignment().get().topicPartitions()),
Review Comment:
nit: You may be able to write it as follow:
`consumer.targetAssignment().map(a ->
a.topicPartitions().stream().collect(Collectors.toUnmodifiableList())).orElse(Collections.emptyList()),`.
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -441,36 +472,52 @@ private void printMembers(Map<String,
Entry<Optional<GroupState>, Optional<Colle
memberAssignment.host,
memberAssignment.clientId, memberAssignment.numPartitions);
}
if (verbose) {
- String partitions;
-
- if (memberAssignment.assignment.isEmpty())
- partitions = MISSING_COLUMN_VALUE;
- else {
- Map<String, List<TopicPartition>> grouped =
new HashMap<>();
- memberAssignment.assignment.forEach(
- tp -> grouped.computeIfAbsent(tp.topic(),
key -> new ArrayList<>()).add(tp));
- partitions =
grouped.values().stream().map(topicPartitions ->
-
topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",",
"(", ")"))
- ).sorted().collect(Collectors.joining(", "));
- }
- System.out.printf("%s", partitions);
+ String currentEpoch =
memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ String currentAssignment =
memberAssignment.assignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.assignment);
+ String targetEpoch =
memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ String targetAssignment =
memberAssignment.targetAssignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.targetAssignment);
+ String upgraded =
memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ System.out.printf(verboseFormat, currentEpoch,
currentAssignment, targetEpoch, targetAssignment, upgraded);
}
System.out.println();
}
}
});
}
- private void printStates(Map<String, GroupInformation> states) {
+ private String getAssignmentString(List<TopicPartition> assignment) {
+ Map<String, List<TopicPartition>> grouped = new HashMap<>();
+ assignment.forEach(
+ tp -> grouped.computeIfAbsent(tp.topic(), key -> new
ArrayList<>()).add(tp));
+ return grouped.entrySet().stream().map(entry -> {
+ String topicName = entry.getKey();
+ List<TopicPartition> topicPartitions = entry.getValue();
+ return
topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",",
topicName + ":", ""));
+ }).sorted().collect(Collectors.joining(";"));
+ }
+
+ private void printStates(Map<String, GroupInformation> states, boolean
verbose) {
states.forEach((groupId, state) -> {
if (shouldPrintMemberState(groupId,
Optional.of(state.groupState), Optional.of(1))) {
String coordinator = state.coordinator.host() + ":" +
state.coordinator.port() + " (" + state.coordinator.idString() + ")";
int coordinatorColLen = Math.max(25, coordinator.length());
+ int groupColLen = Math.max(15, state.group.length());
- String format = "\n%" + -coordinatorColLen + "s %-25s
%-20s %-15s %s";
+ String assignmentStrategy =
state.assignmentStrategy.isEmpty() ? MISSING_COLUMN_VALUE :
state.assignmentStrategy;
- System.out.printf(format, "GROUP", "COORDINATOR (ID)",
"ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS");
- System.out.printf(format, state.group, coordinator,
state.assignmentStrategy, state.groupState, state.numMembers);
+ if (verbose) {
+ String format = "\n%" + -groupColLen + "s %" +
-coordinatorColLen + "s %-20s %-20s %-15s %-25s %s";
+ String groupEpoch =
state.groupEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ String targetAssignmentEpoch =
state.targetAssignmentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ System.out.printf(format, "GROUP", "COORDINATOR (ID)",
"ASSIGNMENT-STRATEGY", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH",
"#MEMBERS");
+ System.out.printf(format, state.group, coordinator,
assignmentStrategy, state.groupState, groupEpoch, targetAssignmentEpoch,
state.numMembers);
Review Comment:
nit: Should we just inline groupEpoch and targetAssignmentEpoch like you did
in the other cases?
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -414,20 +438,27 @@ private void printMembers(Map<String,
Entry<Optional<GroupState>, Optional<Colle
maxHostLen = Math.max(maxHostLen,
memberAssignment.host.length());
maxClientIdLen = Math.max(maxClientIdLen,
memberAssignment.clientId.length());
includeGroupInstanceId = includeGroupInstanceId ||
!memberAssignment.groupInstanceId.isEmpty();
+ String currentAssignment =
memberAssignment.assignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.assignment);
+ String targetAssignment =
memberAssignment.targetAssignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.targetAssignment);
+ maxCurrentAssignment =
Math.max(maxCurrentAssignment, currentAssignment.length());
+ maxTargetAssignment =
Math.max(maxTargetAssignment, targetAssignment.length());
}
}
}
String format0 = "%" + -maxGroupLen + "s %" +
-maxConsumerIdLen + "s %" + -maxGroupInstanceIdLen + "s %" + -maxHostLen + "s
%" + -maxClientIdLen + "s %-15s ";
Review Comment:
nit: Should we call this one `formatWithGroupInstanceId`?
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -414,20 +438,27 @@ private void printMembers(Map<String,
Entry<Optional<GroupState>, Optional<Colle
maxHostLen = Math.max(maxHostLen,
memberAssignment.host.length());
maxClientIdLen = Math.max(maxClientIdLen,
memberAssignment.clientId.length());
includeGroupInstanceId = includeGroupInstanceId ||
!memberAssignment.groupInstanceId.isEmpty();
+ String currentAssignment =
memberAssignment.assignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.assignment);
+ String targetAssignment =
memberAssignment.targetAssignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.targetAssignment);
+ maxCurrentAssignment =
Math.max(maxCurrentAssignment, currentAssignment.length());
+ maxTargetAssignment =
Math.max(maxTargetAssignment, targetAssignment.length());
}
}
}
String format0 = "%" + -maxGroupLen + "s %" +
-maxConsumerIdLen + "s %" + -maxGroupInstanceIdLen + "s %" + -maxHostLen + "s
%" + -maxClientIdLen + "s %-15s ";
String format1 = "%" + -maxGroupLen + "s %" +
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-15s ";
+ String verboseFormat = "%-15s %" + -maxCurrentAssignment + "s
%-15s %" + -maxTargetAssignment + "s %s";
if (includeGroupInstanceId) {
System.out.printf("\n" + format0, "GROUP", "CONSUMER-ID",
"GROUP-INSTANCE-ID", "HOST", "CLIENT-ID", "#PARTITIONS");
} else {
System.out.printf("\n" + format1, "GROUP", "CONSUMER-ID",
"HOST", "CLIENT-ID", "#PARTITIONS");
}
if (verbose)
- System.out.printf("%s", "ASSIGNMENT");
+ System.out.printf(verboseFormat, "CURRENT-EPOCH",
"CURRENT-ASSIGNMENT", "TARGET-EPOCH", "TARGET-ASSIGNMENT", "UPGRADED");
Review Comment:
Didn't we say that we would display `UPGRADED` only when there is
non-upgraded members in the group?
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -441,36 +472,52 @@ private void printMembers(Map<String,
Entry<Optional<GroupState>, Optional<Colle
memberAssignment.host,
memberAssignment.clientId, memberAssignment.numPartitions);
}
if (verbose) {
- String partitions;
-
- if (memberAssignment.assignment.isEmpty())
- partitions = MISSING_COLUMN_VALUE;
- else {
- Map<String, List<TopicPartition>> grouped =
new HashMap<>();
- memberAssignment.assignment.forEach(
- tp -> grouped.computeIfAbsent(tp.topic(),
key -> new ArrayList<>()).add(tp));
- partitions =
grouped.values().stream().map(topicPartitions ->
-
topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",",
"(", ")"))
- ).sorted().collect(Collectors.joining(", "));
- }
- System.out.printf("%s", partitions);
+ String currentEpoch =
memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ String currentAssignment =
memberAssignment.assignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.assignment);
+ String targetEpoch =
memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ String targetAssignment =
memberAssignment.targetAssignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.targetAssignment);
+ String upgraded =
memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ System.out.printf(verboseFormat, currentEpoch,
currentAssignment, targetEpoch, targetAssignment, upgraded);
}
System.out.println();
}
}
});
}
- private void printStates(Map<String, GroupInformation> states) {
+ private String getAssignmentString(List<TopicPartition> assignment) {
+ Map<String, List<TopicPartition>> grouped = new HashMap<>();
+ assignment.forEach(
+ tp -> grouped.computeIfAbsent(tp.topic(), key -> new
ArrayList<>()).add(tp));
+ return grouped.entrySet().stream().map(entry -> {
+ String topicName = entry.getKey();
+ List<TopicPartition> topicPartitions = entry.getValue();
+ return
topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",",
topicName + ":", ""));
Review Comment:
nit: This line is pretty long, don't you think? It would be great if we
could break it.
```
topicPartitions
.stream()
.map(TopicPartition::partition)
.map(Object::toString)
.sorted()
.collect(Collectors.joining(",", topicName + ":", ""))
```
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -441,36 +472,52 @@ private void printMembers(Map<String,
Entry<Optional<GroupState>, Optional<Colle
memberAssignment.host,
memberAssignment.clientId, memberAssignment.numPartitions);
}
if (verbose) {
- String partitions;
-
- if (memberAssignment.assignment.isEmpty())
- partitions = MISSING_COLUMN_VALUE;
- else {
- Map<String, List<TopicPartition>> grouped =
new HashMap<>();
- memberAssignment.assignment.forEach(
- tp -> grouped.computeIfAbsent(tp.topic(),
key -> new ArrayList<>()).add(tp));
- partitions =
grouped.values().stream().map(topicPartitions ->
-
topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",",
"(", ")"))
- ).sorted().collect(Collectors.joining(", "));
- }
- System.out.printf("%s", partitions);
+ String currentEpoch =
memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ String currentAssignment =
memberAssignment.assignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.assignment);
+ String targetEpoch =
memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ String targetAssignment =
memberAssignment.targetAssignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.targetAssignment);
+ String upgraded =
memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ System.out.printf(verboseFormat, currentEpoch,
currentAssignment, targetEpoch, targetAssignment, upgraded);
}
System.out.println();
}
}
});
}
- private void printStates(Map<String, GroupInformation> states) {
+ private String getAssignmentString(List<TopicPartition> assignment) {
+ Map<String, List<TopicPartition>> grouped = new HashMap<>();
+ assignment.forEach(
+ tp -> grouped.computeIfAbsent(tp.topic(), key -> new
ArrayList<>()).add(tp));
Review Comment:
nit: How about this?
```
assignment.forEach(tp ->
grouped
.computeIfAbsent(tp.topic(), key -> new ArrayList<>())
.add(tp)
);
```
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -394,14 +415,17 @@ private static String
printOffsetFormat(Optional<Collection<PartitionAssignmentS
}
}
- return "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s
%-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s";
+ return verbose ?
+ "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s
%-15s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s
%s" :
+ "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s
%-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s";
Review Comment:
nit: It may be better to use a regular if/else for this one. I find it hard
to read like this.
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -414,20 +438,27 @@ private void printMembers(Map<String,
Entry<Optional<GroupState>, Optional<Colle
maxHostLen = Math.max(maxHostLen,
memberAssignment.host.length());
maxClientIdLen = Math.max(maxClientIdLen,
memberAssignment.clientId.length());
includeGroupInstanceId = includeGroupInstanceId ||
!memberAssignment.groupInstanceId.isEmpty();
+ String currentAssignment =
memberAssignment.assignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.assignment);
+ String targetAssignment =
memberAssignment.targetAssignment.isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.targetAssignment);
+ maxCurrentAssignment =
Math.max(maxCurrentAssignment, currentAssignment.length());
+ maxTargetAssignment =
Math.max(maxTargetAssignment, targetAssignment.length());
}
}
}
String format0 = "%" + -maxGroupLen + "s %" +
-maxConsumerIdLen + "s %" + -maxGroupInstanceIdLen + "s %" + -maxHostLen + "s
%" + -maxClientIdLen + "s %-15s ";
String format1 = "%" + -maxGroupLen + "s %" +
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-15s ";
Review Comment:
nit: Should we call this one `formatWithoutGroupInstanceId`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]