This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new cf269bee8c9 KAFKA-18046; High CPU usage when using Log4j2 (#19138)
cf269bee8c9 is described below

commit cf269bee8c909cd0879556063ce3aafc450f0c76
Author: David Jacot <[email protected]>
AuthorDate: Fri Mar 7 09:03:32 2025 +0100

    KAFKA-18046; High CPU usage when using Log4j2 (#19138)
    
    This patch is a first step towards resolving KAFKA-18046. Apache Kafka
    4.0 ships with log4j2 so the issue raised in the ticket causing high CPU
    usage on the fetch path due to LoggerFactory.getLogger() being called on
    the handling of all fetch responses is not good. Hence, I propose to fix
    that one by caching the Logger used by the `CompletedFetch` class.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Ismael Juma 
<[email protected]>
---
 .../org/apache/kafka/clients/consumer/internals/AbstractFetch.java  | 6 +++++-
 .../org/apache/kafka/clients/consumer/internals/CompletedFetch.java | 5 ++---
 .../apache/kafka/clients/consumer/internals/CompletedFetchTest.java | 2 +-
 .../apache/kafka/clients/consumer/internals/FetchBufferTest.java    | 2 +-
 .../apache/kafka/clients/consumer/internals/FetchCollectorTest.java | 2 +-
 5 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 8451ded8d85..5083f7733b8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -64,6 +64,9 @@ import static 
org.apache.kafka.clients.consumer.internals.FetchUtils.requestMeta
 public abstract class AbstractFetch implements Closeable {
 
     private final Logger log;
+    // Calling LoggerFactory.getLogger() is pretty expensive with log4j2. See 
KAFKA-18046 for details.
+    // We cache the logger used by CompletedFetch because it is created on 
every fetch responses.
+    private final Logger completedFetchLog;
     private final IdempotentCloser idempotentCloser = new IdempotentCloser();
     protected final LogContext logContext;
     protected final ConsumerMetadata metadata;
@@ -88,6 +91,7 @@ public abstract class AbstractFetch implements Closeable {
                          final Time time,
                          final ApiVersions apiVersions) {
         this.log = logContext.logger(AbstractFetch.class);
+        this.completedFetchLog = logContext.logger(CompletedFetch.class);
         this.logContext = logContext;
         this.metadata = metadata;
         this.subscriptions = subscriptions;
@@ -208,7 +212,7 @@ public abstract class AbstractFetch implements Closeable {
                 }
 
                 CompletedFetch completedFetch = new CompletedFetch(
-                        logContext,
+                        completedFetchLog,
                         subscriptions,
                         decompressionBufferSupplier,
                         partition,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index 2cba76588e5..d615c21318d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.CloseableIterator;
-import org.apache.kafka.common.utils.LogContext;
 
 import org.slf4j.Logger;
 
@@ -83,7 +82,7 @@ public class CompletedFetch {
     private boolean isConsumed = false;
     private boolean initialized = false;
 
-    CompletedFetch(LogContext logContext,
+    CompletedFetch(Logger log,
                    SubscriptionState subscriptions,
                    BufferSupplier decompressionBufferSupplier,
                    TopicPartition partition,
@@ -91,7 +90,7 @@ public class CompletedFetch {
                    FetchMetricsAggregator metricAggregator,
                    Long fetchOffset,
                    short requestVersion) {
-        this.log = logContext.logger(CompletedFetch.class);
+        this.log = log;
         this.subscriptions = subscriptions;
         this.decompressionBufferSupplier = decompressionBufferSupplier;
         this.partition = partition;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
index e12b0121fd4..20389081ffd 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
@@ -221,7 +221,7 @@ public class CompletedFetchTest {
         FetchMetricsAggregator metricAggregator = new 
FetchMetricsAggregator(metrics, Collections.singleton(TP));
 
         return new CompletedFetch(
-                logContext,
+                logContext.logger(CompletedFetch.class),
                 subscriptions,
                 BufferSupplier.create(),
                 TP,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
index 1f33a6e020f..7c831f2d487 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
@@ -192,7 +192,7 @@ public class FetchBufferTest {
         FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData();
         FetchMetricsAggregator metricsAggregator = new 
FetchMetricsAggregator(metricsManager, allPartitions);
         return new CompletedFetch(
-                logContext,
+                logContext.logger(CompletedFetch.class),
                 subscriptions,
                 BufferSupplier.create(),
                 tp,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
index 68b9ecb528b..01d52b3662b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
@@ -916,7 +916,7 @@ public class FetchCollectorTest {
 
             FetchMetricsAggregator metricsAggregator = new 
FetchMetricsAggregator(metricsManager, allPartitions);
             return new CompletedFetch(
-                    logContext,
+                    logContext.logger(CompletedFetch.class),
                     subscriptions,
                     BufferSupplier.create(),
                     topicPartition,

Reply via email to