dajac commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100441588


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -2455,7 +2454,7 @@ private void close(Duration timeout, boolean 
swallowException) {
             closeTimer.reset(remainingDurationInTimeout);
 
             // This is a blocking call bound by the time remaining in 
closeTimer
-            LambdaUtils.swallow(() -> fetcher.close(closeTimer), 
firstException);
+            Utils.swallow(log, " fetcher close", () -> 
fetcher.close(closeTimer), firstException);

Review Comment:
   nit: There is an extra space before `fetcher`.



##########
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##########
@@ -238,7 +238,7 @@ public KafkaClusterTestKit build() throws Exception {
                                 bootstrapMetadata);
                     } catch (Throwable e) {
                         log.error("Error creating controller {}", node.id(), 
e);
-                        Utils.swallow(log, "sharedServer.stopForController", 
() -> sharedServer.stopForController());
+                        Utils.swallow(log, "sharedServer.stopForController", 
() -> sharedServer.stopForController(), null);

Review Comment:
   nit: Should we add an overload to avoid having to pass null here?



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -998,15 +998,18 @@ public static void closeAll(Closeable... closeables) 
throws IOException {
             throw exception;
     }
 
-    public static void swallow(
-        Logger log,
-        String what,
-        Runnable runnable
-    ) {
-        try {
-            runnable.run();
-        } catch (Throwable e) {
-            log.warn("{} error", what, e);
+    /**
+     * Run the supplied code. If an exception is thrown, it is swallowed and 
registered to the firstException parameter.
+     */
+    public static void swallow(Logger log, String what, final Runnable code, 
final AtomicReference<Throwable> firstException) {
+        if (code != null) {
+            try {
+                code.run();
+            } catch (Throwable t) {
+                log.warn("{} error", what, t);

Review Comment:
   Should this be an error instead of a warn to be consistent with 
`closeQuietly`? Moreover, I wonder if we could improve the error message. We 
would get something like `fetcher close error` which is not really inline with 
what we usually log. For instance, `closeQuietly` would log something like 
`Failed to close fetch...`. Do you have any thoughts on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to