This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f68a149a184 KAFKA-18509 Move StateChangeLogger to server-common module
(#20637)
f68a149a184 is described below
commit f68a149a184e264ea709d10de54aec8fdbcf096b
Author: Ken Huang <[email protected]>
AuthorDate: Mon Oct 6 22:56:57 2025 +0800
KAFKA-18509 Move StateChangeLogger to server-common module (#20637)
We can rewrite this class from scala to java and move to server-common
module. To maintain backward compatibility, we should keep the logger
name `state.change.logger`.
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 4 +-
.../scala/kafka/controller/StateChangeLogger.scala | 45 ------------------
.../main/scala/kafka/server/ReplicaManager.scala | 12 ++---
.../org/apache/kafka/logger/StateChangeLogger.java | 54 ++++++++++++++++++++++
4 files changed, 62 insertions(+), 53 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 44b28a1f07e..4c729f59606 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -20,7 +20,6 @@ import java.lang.{Long => JLong}
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.Optional
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap,
CopyOnWriteArrayList}
-import kafka.controller.StateChangeLogger
import kafka.log._
import kafka.server._
import kafka.server.share.DelayedShareFetch
@@ -37,6 +36,7 @@ import org.apache.kafka.common.record.{FileRecords,
MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests._
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState,
MetadataCache, PartitionRegistration}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.log.remote.TopicPartitionLog
@@ -322,7 +322,7 @@ class Partition(val topicPartition: TopicPartition,
def topic: String = topicPartition.topic
def partitionId: Int = topicPartition.partition
- private val stateChangeLogger = new StateChangeLogger(localBrokerId,
inControllerContext = false, None)
+ private val stateChangeLogger = new StateChangeLogger(localBrokerId)
private val remoteReplicasMap = new ConcurrentHashMap[Int, Replica]
// The read lock is only required when multiple reads are executed and needs
to be in a consistent manner
private val leaderIsrUpdateLock = new ReentrantReadWriteLock
diff --git a/core/src/main/scala/kafka/controller/StateChangeLogger.scala
b/core/src/main/scala/kafka/controller/StateChangeLogger.scala
deleted file mode 100644
index 9f188fe33b7..00000000000
--- a/core/src/main/scala/kafka/controller/StateChangeLogger.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.controller
-
-import com.typesafe.scalalogging.Logger
-import kafka.utils.Logging
-
-object StateChangeLogger {
- private val logger = Logger("state.change.logger")
-}
-
-/**
- * Simple class that sets `logIdent` appropriately depending on whether the
state change logger is being used in the
- * context of the KafkaController or not (e.g. ReplicaManager and
MetadataCache log to the state change logger
- * irrespective of whether the broker is the Controller).
- */
-class StateChangeLogger(brokerId: Int, inControllerContext: Boolean,
controllerEpoch: Option[Int]) extends Logging {
-
- if (controllerEpoch.isDefined && !inControllerContext)
- throw new IllegalArgumentException("Controller epoch should only be
defined if inControllerContext is true")
-
- override lazy val logger: Logger = StateChangeLogger.logger
-
- locally {
- val prefix = if (inControllerContext) "Controller" else "Broker"
- val epochEntry = controllerEpoch.fold("")(epoch => s" epoch=$epoch")
- logIdent = s"[$prefix id=$brokerId$epochEntry] "
- }
-
-}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 10b41b88bd0..ff70d7ae34a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -18,7 +18,6 @@ package kafka.server
import com.yammer.metrics.core.Meter
import kafka.cluster.{Partition, PartitionListener}
-import kafka.controller.StateChangeLogger
import kafka.log.LogManager
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
@@ -48,6 +47,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig,
TransactionLogConfig}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
+import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal,
StopPartition}
@@ -272,7 +272,7 @@ class ReplicaManager(val config: KafkaConfig,
@volatile private var isInControlledShutdown = false
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
- protected val stateChangeLogger = new StateChangeLogger(localBrokerId,
inControllerContext = false, None)
+ protected val stateChangeLogger = new StateChangeLogger(localBrokerId)
private var logDirFailureHandler: LogDirFailureHandler = _
@@ -789,9 +789,9 @@ class ReplicaManager(val config: KafkaConfig,
hasCustomErrorMessage = customException.isDefined
)
}
- // In non-transaction paths, errorResults is typically empty, so we can
+ // In non-transaction paths, errorResults is typically empty, so we can
// directly use entriesPerPartition instead of creating a new filtered
collection
- val entriesWithoutErrorsPerPartition =
+ val entriesWithoutErrorsPerPartition =
if (errorResults.nonEmpty) entriesPerPartition.filter { case (key, _)
=> !errorResults.contains(key) }
else entriesPerPartition
@@ -1637,13 +1637,13 @@ class ReplicaManager(val config: KafkaConfig,
remoteFetchPartitionStatus:
Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
val remoteFetchResults = new util.HashMap[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]]
-
+
remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) =>
val (task, result) = processRemoteFetch(remoteFetchInfo)
remoteFetchTasks.put(topicIdPartition, task)
remoteFetchResults.put(topicIdPartition, result)
}
-
+
val remoteFetchMaxWaitMs =
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks,
remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
remoteFetchPartitionStatus, params, logReadResults, this,
responseCallback)
diff --git
a/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java
b/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java
new file mode 100644
index 00000000000..a8f7ed9cc9d
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.logger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple class that sets logIdent appropriately depending on whether the
state change logger is being used in the
+ * context of the broker (e.g. ReplicaManager and Partition).
+ */
+public class StateChangeLogger {
+ private static final Logger LOGGER =
LoggerFactory.getLogger("state.change.logger");
+
+ private final String logIdent;
+
+ public StateChangeLogger(int brokerId) {
+ this.logIdent = String.format("[Broker id=%d] ", brokerId);
+ }
+
+ public void trace(String message) {
+ LOGGER.info("{}{}", logIdent, message);
+ }
+
+ public void info(String message) {
+ LOGGER.info("{}{}", logIdent, message);
+ }
+
+ public void warn(String message) {
+ LOGGER.warn("{}{}", logIdent, message);
+ }
+
+ public void error(String message) {
+ LOGGER.error("{}{}", logIdent, message);
+ }
+
+ public void error(String message, Throwable e) {
+ LOGGER.error("{}{}", logIdent, message, e);
+ }
+}