This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new bf9912a1eb6 KAFKA-18046; High CPU usage when using Log4j2 (#19138)
bf9912a1eb6 is described below
commit bf9912a1eb6ec278cbe9bfdb5417b5291f54fe0d
Author: David Jacot <[email protected]>
AuthorDate: Fri Mar 7 09:03:32 2025 +0100
KAFKA-18046; High CPU usage when using Log4j2 (#19138)
This patch is a first step towards resolving KAFKA-18046. Apache Kafka
4.0 ships with log4j2 so the issue raised in the ticket causing high CPU
usage on the fetch path due to LoggerFactory.getLogger() being called on
the handling of all fetch responses is not good. Hence, I propose to fix
that one by caching the Logger used by the `CompletedFetch` class.
Reviewers: Chia-Ping Tsai <[email protected]>, Ismael Juma
<[email protected]>
---
.../org/apache/kafka/clients/consumer/internals/AbstractFetch.java | 6 +++++-
.../org/apache/kafka/clients/consumer/internals/CompletedFetch.java | 5 ++---
.../apache/kafka/clients/consumer/internals/CompletedFetchTest.java | 2 +-
.../apache/kafka/clients/consumer/internals/FetchBufferTest.java | 2 +-
.../apache/kafka/clients/consumer/internals/FetchCollectorTest.java | 2 +-
5 files changed, 10 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 8f5691d17c0..2406b9f5c89 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -61,6 +61,9 @@ import static
org.apache.kafka.clients.consumer.internals.FetchUtils.requestMeta
public abstract class AbstractFetch implements Closeable {
private final Logger log;
+ // Calling LoggerFactory.getLogger() is pretty expensive with log4j2. See
KAFKA-18046 for details.
+ // We cache the logger used by CompletedFetch because it is created on
every fetch responses.
+ private final Logger completedFetchLog;
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
protected final LogContext logContext;
protected final ConsumerMetadata metadata;
@@ -85,6 +88,7 @@ public abstract class AbstractFetch implements Closeable {
final Time time,
final ApiVersions apiVersions) {
this.log = logContext.logger(AbstractFetch.class);
+ this.completedFetchLog = logContext.logger(CompletedFetch.class);
this.logContext = logContext;
this.metadata = metadata;
this.subscriptions = subscriptions;
@@ -205,7 +209,7 @@ public abstract class AbstractFetch implements Closeable {
}
CompletedFetch completedFetch = new CompletedFetch(
- logContext,
+ completedFetchLog,
subscriptions,
decompressionBufferSupplier,
partition,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index 2cba76588e5..d615c21318d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
-import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
@@ -83,7 +82,7 @@ public class CompletedFetch {
private boolean isConsumed = false;
private boolean initialized = false;
- CompletedFetch(LogContext logContext,
+ CompletedFetch(Logger log,
SubscriptionState subscriptions,
BufferSupplier decompressionBufferSupplier,
TopicPartition partition,
@@ -91,7 +90,7 @@ public class CompletedFetch {
FetchMetricsAggregator metricAggregator,
Long fetchOffset,
short requestVersion) {
- this.log = logContext.logger(CompletedFetch.class);
+ this.log = log;
this.subscriptions = subscriptions;
this.decompressionBufferSupplier = decompressionBufferSupplier;
this.partition = partition;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
index f7be3a58ffd..8f841aa4820 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
@@ -222,7 +222,7 @@ public class CompletedFetchTest {
FetchMetricsAggregator metricAggregator = new
FetchMetricsAggregator(metrics, Collections.singleton(TP));
return new CompletedFetch(
- logContext,
+ logContext.logger(CompletedFetch.class),
subscriptions,
BufferSupplier.create(),
TP,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
index 1f33a6e020f..7c831f2d487 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
@@ -192,7 +192,7 @@ public class FetchBufferTest {
FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData();
FetchMetricsAggregator metricsAggregator = new
FetchMetricsAggregator(metricsManager, allPartitions);
return new CompletedFetch(
- logContext,
+ logContext.logger(CompletedFetch.class),
subscriptions,
BufferSupplier.create(),
tp,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
index 60d54f863db..554489279c1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
@@ -815,7 +815,7 @@ public class FetchCollectorTest {
FetchMetricsAggregator metricsAggregator = new
FetchMetricsAggregator(metricsManager, allPartitions);
return new CompletedFetch(
- logContext,
+ logContext.logger(CompletedFetch.class),
subscriptions,
BufferSupplier.create(),
topicPartition,