This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f1e9aa1c655 MINOR: Fix flaky tests in Tools modules (#20225)
f1e9aa1c655 is described below
commit f1e9aa1c6553855206f52b17c50eed537a6611e2
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Wed Jul 23 12:10:18 2025 +0530
MINOR: Fix flaky tests in Tools modules (#20225)
### Problem
The
`ShareGroupCommandTest.testDeleteShareGroupOffsetsArgsWithoutTopic()`,
`ShareGroupCommandTest.testDeleteShareGroupOffsetsArgsWithoutGroup()`,
`ResetStreamsGroupOffsetTest.testResetOffsetsWithoutGroupOption()`,
`DeleteStreamsGroupTest.testDeleteWithoutGroupOption()`,
`DescribeStreamsGroupTest.testDescribeWithoutGroupOption()` tests were
flaky due to a dependency on Set iteration order in error message
generation.
### Root Cause
The cleanup [commit](https://github.com/apache/kafka/pull/20091) that
replaced `new HashSet<>(Arrays.asList(...))` with `Set.of(...)` in
ShareGroupCommandOptions and StreamsGroupCommandOptions changed the
iteration characteristics of collections used for error message
generation:
This produces different orders like `[topic], [group]` vs `[group],
[topic]`, but the tests expected a specific order, causing intermittent
failures.
### Solution
Fix the root cause by ensuring deterministic error message generation
through alphabetical sorting of option names.
Reviewers: ShivsundarR <[email protected]>, Ken Huang
<[email protected]>, TengYao Chi <[email protected]>
---
.../kafka/tools/consumer/group/ShareGroupCommandOptions.java | 6 +++---
.../kafka/tools/streams/StreamsGroupCommandOptions.java | 12 ++++++------
.../kafka/tools/consumer/group/ShareGroupCommandTest.java | 4 ++--
3 files changed, 11 insertions(+), 11 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index 9a96cad00ed..f155a9c4b5e 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -159,11 +159,11 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
if (options.has(describeOpt)) {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
- "Option " + describeOpt + " takes one of these options: "
+
allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ "Option " + describeOpt + " takes one of these options: "
+
allGroupSelectionScopeOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt,
offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ?
1 : 0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
- "Option " + describeOpt + " takes at most one of these
options: " +
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ "Option " + describeOpt + " takes at most one of these
options: " +
mutuallyExclusiveOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
}
if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
CommandLineUtils.printUsageAndExit(parser,
@@ -185,7 +185,7 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
if (options.has(deleteOffsetsOpt)) {
if (!options.has(groupOpt) || !options.has(topicOpt))
CommandLineUtils.printUsageAndExit(parser,
- "Option " + deleteOffsetsOpt + " takes the following
options: " +
allDeleteOffsetsOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ "Option " + deleteOffsetsOpt + " takes the following
options: " +
allDeleteOffsetsOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
}
if (options.has(resetOffsetsOpt)) {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index 6ce387d3dbd..d6715f833d2 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -222,7 +222,7 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
if (options.has(deleteOpt)) {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
- "Option " + deleteOpt + " takes one of these options: " +
allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ "Option " + deleteOpt + " takes one of these options: " +
allGroupSelectionScopeOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
if (options.has(inputTopicOpt) || options.has(allInputTopicsOpt))
CommandLineUtils.printUsageAndExit(parser, "Kafka Streams does
not support topic-specific offset " +
"deletion from a streams group.");
@@ -253,11 +253,11 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
- "Option " + describeOpt + " takes one of these options: " +
allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ "Option " + describeOpt + " takes one of these options: " +
allGroupSelectionScopeOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt,
offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 :
0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
- "Option " + describeOpt + " takes at most one of these
options: " +
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ "Option " + describeOpt + " takes at most one of these
options: " +
mutuallyExclusiveOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
}
if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
CommandLineUtils.printUsageAndExit(parser,
@@ -267,7 +267,7 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
private void checkDeleteOffsetsArgs() {
if ((!options.has(inputTopicOpt) && !options.has(allInputTopicsOpt))
|| !options.has(groupOpt))
CommandLineUtils.printUsageAndExit(parser,
- "Option " + deleteOffsetsOpt + " takes the " + groupOpt + "
and one of these options: " +
allDeleteOffsetsOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ "Option " + deleteOffsetsOpt + " takes the " + groupOpt + "
and one of these options: " +
allDeleteOffsetsOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
if (options.valuesOf(groupOpt).size() > 1)
CommandLineUtils.printUsageAndExit(parser,
"Option " + deleteOffsetsOpt + " supports only one " +
groupOpt + " at a time, but found: " + options.valuesOf(groupOpt));
@@ -286,7 +286,7 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
- "Option " + resetOffsetsOpt + " takes one of these options: "
+
allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ "Option " + resetOffsetsOpt + " takes one of these options: "
+
allGroupSelectionScopeOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt,
minus(allResetOffsetScenarioOpts, resetToOffsetOpt));
CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt,
minus(allResetOffsetScenarioOpts, resetToDatetimeOpt));
@@ -301,7 +301,7 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
private void checkDeleteAllInternalTopicsArgs() {
if (!options.has(resetOffsetsOpt) && !options.has(deleteOpt)) {
CommandLineUtils.printUsageAndExit(parser,
- "Option " + deleteAllInternalTopicsOpt + " takes one of these
options: " +
allDeleteInternalGroupsOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ "Option " + deleteAllInternalTopicsOpt + " takes one of these
options: " +
allDeleteInternalGroupsOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
} else if (options.has(resetOffsetsOpt) && !options.has(executeOpt)) {
CommandLineUtils.printUsageAndExit(parser,
"Option " + deleteAllInternalTopicsOpt + " takes " +
executeOpt + " when " + resetOffsetsOpt + " is used.");
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index b7f59f1a991..19a11ce0f90 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -626,7 +626,7 @@ public class ShareGroupCommandTest {
AtomicBoolean exited = new AtomicBoolean(false);
Exit.setExitProcedure(((statusCode, message) -> {
assertNotEquals(0, statusCode);
- assertTrue(message.contains("Option [delete-offsets] takes the
following options: [topic], [group]"));
+ assertTrue(message.contains("Option [delete-offsets] takes the
following options: [group], [topic]"));
exited.set(true);
}));
try {
@@ -646,7 +646,7 @@ public class ShareGroupCommandTest {
AtomicBoolean exited = new AtomicBoolean(false);
Exit.setExitProcedure(((statusCode, message) -> {
assertNotEquals(0, statusCode);
- assertTrue(message.contains("Option [delete-offsets] takes the
following options: [topic], [group]"));
+ assertTrue(message.contains("Option [delete-offsets] takes the
following options: [group], [topic]"));
exited.set(true);
}));
try {