chia7712 commented on code in PR #20159:
URL: https://github.com/apache/kafka/pull/20159#discussion_r2480404396


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -415,7 +415,9 @@ private long sendProducerData(long now) {
         for (ProducerBatch expiredBatch : expiredBatches) {
             String errorMessage = "Expiring " + expiredBatch.recordCount + " 
record(s) for " + expiredBatch.topicPartition
                 + ":" + (now - expiredBatch.createdMs) + " ms has passed since 
batch creation";
-            failBatch(expiredBatch, new TimeoutException(errorMessage), false);
+            KafkaException potentialCause = new KafkaException(
+                    "The broker might be unavailable or responding slowly, or 
the CPU might be busy.");
+            failBatch(expiredBatch, new TimeoutException(errorMessage, 
potentialCause), false);

Review Comment:
   It seems to me that the "reason" is too specific and might confuse users 
when they are tracing logs. Perhaps the "reason" could be "no server response 
yet" or "the request have not been sent" WDYT?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3908,6 +3922,22 @@ private void assertFutureFailure(Future<?> future, 
Class<? extends Exception> ex
         }
     }
 
+    private void assertFutureFailure(Future<?> future, Class<? extends 
Exception> expectedExceptionType,
+                                     Class<? extends Exception> 
rootCauseExceptionType,
+                                     String rootCauseExceptionMessage)
+            throws InterruptedException {
+        assertTrue(future.isDone());
+        try {
+            future.get();

Review Comment:
   would you mind using `assertThrows`?
   
   ```java
       private void assertFutureFailure(Future<?> future, Class<? extends 
Exception> expectedExceptionType,
                                        Class<? extends Exception> 
rootCauseExceptionType,
                                        String rootCauseExceptionMessage) {
           assertTrue(future.isDone());
           var e = assertThrows(ExecutionException.class, future::get);
           Class<? extends Throwable> causeType = e.getCause().getClass();
           assertTrue(expectedExceptionType.isAssignableFrom(causeType), 
"Unexpected cause " + causeType.getName());
           assertInstanceOf(rootCauseExceptionType, e.getCause().getCause(), 
rootCauseExceptionMessage);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to