This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8f1a1cd8387 MINOR: Refactor awaitNonEmptyRecords to remove dead code 
and improve clarity (#19868)
8f1a1cd8387 is described below

commit 8f1a1cd8387188be90d6baee8ed678706dd24bc4
Author: Jing-Jia Hung <[email protected]>
AuthorDate: Mon Jun 2 12:46:56 2025 +0800

    MINOR: Refactor awaitNonEmptyRecords to remove dead code and improve 
clarity (#19868)
    
    This refactor improves the implementation of `awaitNonEmptyRecords` by:
    
    - Removing the unreachable `throw new IllegalStateException` statement,
    which was dead code due to `pollRecordsUntilTrue` throwing exceptions on
    timeout.
    - Eliminating the use of `return` inside the lambda, which relies on
    non-local returns that can be confusing and error-prone in Scala.
    
    Reviewers: Yung <[email protected]>, Ken Huang
     <[email protected]>, TengYao Chi <[email protected]>
    
    ---------
    
    Co-authored-by: Jing-Jia Hung <[email protected]>
---
 .../scala/integration/kafka/api/AbstractConsumerTest.scala     | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index 517614d84a1..6a60621308b 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -94,12 +94,14 @@ abstract class AbstractConsumerTest extends BaseRequestTest 
{
   def awaitNonEmptyRecords[K, V](consumer: Consumer[K, V],
                                  partition: TopicPartition,
                                  pollTimeoutMs: Long = 100): 
ConsumerRecords[K, V] = {
+    var result: ConsumerRecords[K, V] = null
+
     TestUtils.pollRecordsUntilTrue(consumer, (polledRecords: 
ConsumerRecords[K, V]) => {
-      if (polledRecords.records(partition).asScala.nonEmpty)
-        return polledRecords
-      false
+      val hasRecords = !polledRecords.records(partition).isEmpty
+      if (hasRecords) result = polledRecords
+      hasRecords
     }, s"Consumer did not consume any messages for partition $partition before 
timeout.", JTestUtils.DEFAULT_MAX_WAIT_MS, pollTimeoutMs)
-    throw new IllegalStateException("Should have timed out before reaching 
here")
+    result
   }
 
   /**

Reply via email to