This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8f1a1cd8387 MINOR: Refactor awaitNonEmptyRecords to remove dead code
and improve clarity (#19868)
8f1a1cd8387 is described below
commit 8f1a1cd8387188be90d6baee8ed678706dd24bc4
Author: Jing-Jia Hung <[email protected]>
AuthorDate: Mon Jun 2 12:46:56 2025 +0800
MINOR: Refactor awaitNonEmptyRecords to remove dead code and improve
clarity (#19868)
This refactor improves the implementation of `awaitNonEmptyRecords` by:
- Removing the unreachable `throw new IllegalStateException` statement,
which was dead code due to `pollRecordsUntilTrue` throwing exceptions on
timeout.
- Eliminating the use of `return` inside the lambda, which relies on
non-local returns that can be confusing and error-prone in Scala.
Reviewers: Yung <[email protected]>, Ken Huang
<[email protected]>, TengYao Chi <[email protected]>
---------
Co-authored-by: Jing-Jia Hung <[email protected]>
---
.../scala/integration/kafka/api/AbstractConsumerTest.scala | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index 517614d84a1..6a60621308b 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -94,12 +94,14 @@ abstract class AbstractConsumerTest extends BaseRequestTest
{
def awaitNonEmptyRecords[K, V](consumer: Consumer[K, V],
partition: TopicPartition,
pollTimeoutMs: Long = 100):
ConsumerRecords[K, V] = {
+ var result: ConsumerRecords[K, V] = null
+
TestUtils.pollRecordsUntilTrue(consumer, (polledRecords:
ConsumerRecords[K, V]) => {
- if (polledRecords.records(partition).asScala.nonEmpty)
- return polledRecords
- false
+ val hasRecords = !polledRecords.records(partition).isEmpty
+ if (hasRecords) result = polledRecords
+ hasRecords
}, s"Consumer did not consume any messages for partition $partition before
timeout.", JTestUtils.DEFAULT_MAX_WAIT_MS, pollTimeoutMs)
- throw new IllegalStateException("Should have timed out before reaching
here")
+ result
}
/**