This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8cf2f9a61af KAFKA-18046; High CPU usage when using Log4j2 (#19138)
8cf2f9a61af is described below
commit 8cf2f9a61afd5e6e86804a16a10f3ffa11f3bc36
Author: David Jacot <[email protected]>
AuthorDate: Fri Mar 7 09:03:32 2025 +0100
KAFKA-18046; High CPU usage when using Log4j2 (#19138)
This patch is a first step towards resolving KAFKA-18046. Apache Kafka
4.0 ships with log4j2 so the issue raised in the ticket causing high CPU
usage on the fetch path due to LoggerFactory.getLogger() being called on
the handling of all fetch responses is not good. Hence, I propose to fix
that one by caching the Logger used by the `CompletedFetch` class.
Reviewers: Chia-Ping Tsai <[email protected]>, Ismael Juma
<[email protected]>
---
.../org/apache/kafka/clients/consumer/internals/AbstractFetch.java | 6 +++++-
.../org/apache/kafka/clients/consumer/internals/CompletedFetch.java | 5 ++---
.../apache/kafka/clients/consumer/internals/CompletedFetchTest.java | 2 +-
.../apache/kafka/clients/consumer/internals/FetchBufferTest.java | 2 +-
.../apache/kafka/clients/consumer/internals/FetchCollectorTest.java | 2 +-
5 files changed, 10 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 8451ded8d85..5083f7733b8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -64,6 +64,9 @@ import static
org.apache.kafka.clients.consumer.internals.FetchUtils.requestMeta
public abstract class AbstractFetch implements Closeable {
private final Logger log;
+ // Calling LoggerFactory.getLogger() is pretty expensive with log4j2. See
KAFKA-18046 for details.
+ // We cache the logger used by CompletedFetch because it is created on
every fetch responses.
+ private final Logger completedFetchLog;
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
protected final LogContext logContext;
protected final ConsumerMetadata metadata;
@@ -88,6 +91,7 @@ public abstract class AbstractFetch implements Closeable {
final Time time,
final ApiVersions apiVersions) {
this.log = logContext.logger(AbstractFetch.class);
+ this.completedFetchLog = logContext.logger(CompletedFetch.class);
this.logContext = logContext;
this.metadata = metadata;
this.subscriptions = subscriptions;
@@ -208,7 +212,7 @@ public abstract class AbstractFetch implements Closeable {
}
CompletedFetch completedFetch = new CompletedFetch(
- logContext,
+ completedFetchLog,
subscriptions,
decompressionBufferSupplier,
partition,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index a505de2dc12..73a84d90cbf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
-import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
@@ -83,7 +82,7 @@ public class CompletedFetch {
private boolean isConsumed = false;
private boolean initialized = false;
- CompletedFetch(LogContext logContext,
+ CompletedFetch(Logger log,
SubscriptionState subscriptions,
BufferSupplier decompressionBufferSupplier,
TopicPartition partition,
@@ -91,7 +90,7 @@ public class CompletedFetch {
FetchMetricsAggregator metricAggregator,
Long fetchOffset,
short requestVersion) {
- this.log = logContext.logger(CompletedFetch.class);
+ this.log = log;
this.subscriptions = subscriptions;
this.decompressionBufferSupplier = decompressionBufferSupplier;
this.partition = partition;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
index b804b6b8ae2..cbcec2cfa98 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
@@ -221,7 +221,7 @@ public class CompletedFetchTest {
FetchMetricsAggregator metricAggregator = new
FetchMetricsAggregator(metrics, Collections.singleton(TP));
return new CompletedFetch(
- logContext,
+ logContext.logger(CompletedFetch.class),
subscriptions,
BufferSupplier.create(),
TP,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
index 1f33a6e020f..7c831f2d487 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
@@ -192,7 +192,7 @@ public class FetchBufferTest {
FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData();
FetchMetricsAggregator metricsAggregator = new
FetchMetricsAggregator(metricsManager, allPartitions);
return new CompletedFetch(
- logContext,
+ logContext.logger(CompletedFetch.class),
subscriptions,
BufferSupplier.create(),
tp,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
index d7d53537ae6..11647175fc5 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
@@ -915,7 +915,7 @@ public class FetchCollectorTest {
FetchMetricsAggregator metricsAggregator = new
FetchMetricsAggregator(metricsManager, allPartitions);
return new CompletedFetch(
- logContext,
+ logContext.logger(CompletedFetch.class),
subscriptions,
BufferSupplier.create(),
topicPartition,