This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f68a149a184 KAFKA-18509 Move StateChangeLogger to server-common module 
(#20637)
f68a149a184 is described below

commit f68a149a184e264ea709d10de54aec8fdbcf096b
Author: Ken Huang <[email protected]>
AuthorDate: Mon Oct 6 22:56:57 2025 +0800

    KAFKA-18509 Move StateChangeLogger to server-common module (#20637)
    
    We can rewrite this class from scala to java and move to server-common
    module.  To maintain backward compatibility, we should keep the logger
    name `state.change.logger`.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  4 +-
 .../scala/kafka/controller/StateChangeLogger.scala | 45 ------------------
 .../main/scala/kafka/server/ReplicaManager.scala   | 12 ++---
 .../org/apache/kafka/logger/StateChangeLogger.java | 54 ++++++++++++++++++++++
 4 files changed, 62 insertions(+), 53 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 44b28a1f07e..4c729f59606 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -20,7 +20,6 @@ import java.lang.{Long => JLong}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import java.util.Optional
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, 
CopyOnWriteArrayList}
-import kafka.controller.StateChangeLogger
 import kafka.log._
 import kafka.server._
 import kafka.server.share.DelayedShareFetch
@@ -37,6 +36,7 @@ import org.apache.kafka.common.record.{FileRecords, 
MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests._
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.logger.StateChangeLogger
 import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, 
MetadataCache, PartitionRegistration}
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.log.remote.TopicPartitionLog
@@ -322,7 +322,7 @@ class Partition(val topicPartition: TopicPartition,
   def topic: String = topicPartition.topic
   def partitionId: Int = topicPartition.partition
 
-  private val stateChangeLogger = new StateChangeLogger(localBrokerId, 
inControllerContext = false, None)
+  private val stateChangeLogger = new StateChangeLogger(localBrokerId)
   private val remoteReplicasMap = new ConcurrentHashMap[Int, Replica]
   // The read lock is only required when multiple reads are executed and needs 
to be in a consistent manner
   private val leaderIsrUpdateLock = new ReentrantReadWriteLock
diff --git a/core/src/main/scala/kafka/controller/StateChangeLogger.scala 
b/core/src/main/scala/kafka/controller/StateChangeLogger.scala
deleted file mode 100644
index 9f188fe33b7..00000000000
--- a/core/src/main/scala/kafka/controller/StateChangeLogger.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.controller
-
-import com.typesafe.scalalogging.Logger
-import kafka.utils.Logging
-
-object StateChangeLogger {
-  private val logger = Logger("state.change.logger")
-}
-
-/**
- * Simple class that sets `logIdent` appropriately depending on whether the 
state change logger is being used in the
- * context of the KafkaController or not (e.g. ReplicaManager and 
MetadataCache log to the state change logger
- * irrespective of whether the broker is the Controller).
- */
-class StateChangeLogger(brokerId: Int, inControllerContext: Boolean, 
controllerEpoch: Option[Int]) extends Logging {
-
-  if (controllerEpoch.isDefined && !inControllerContext)
-    throw new IllegalArgumentException("Controller epoch should only be 
defined if inControllerContext is true")
-
-  override lazy val logger: Logger = StateChangeLogger.logger
-
-  locally {
-    val prefix = if (inControllerContext) "Controller" else "Broker"
-    val epochEntry = controllerEpoch.fold("")(epoch => s" epoch=$epoch")
-    logIdent = s"[$prefix id=$brokerId$epochEntry] "
-  }
-
-}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 10b41b88bd0..ff70d7ae34a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -18,7 +18,6 @@ package kafka.server
 
 import com.yammer.metrics.core.Meter
 import kafka.cluster.{Partition, PartitionListener}
-import kafka.controller.StateChangeLogger
 import kafka.log.LogManager
 import kafka.server.HostedPartition.Online
 import kafka.server.QuotaFactory.QuotaManagers
@@ -48,6 +47,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.{Exit, Time, Utils}
 import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionLogConfig}
 import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
+import org.apache.kafka.logger.StateChangeLogger
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, 
StopPartition}
@@ -272,7 +272,7 @@ class ReplicaManager(val config: KafkaConfig,
   @volatile private var isInControlledShutdown = false
 
   this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
-  protected val stateChangeLogger = new StateChangeLogger(localBrokerId, 
inControllerContext = false, None)
+  protected val stateChangeLogger = new StateChangeLogger(localBrokerId)
 
   private var logDirFailureHandler: LogDirFailureHandler = _
 
@@ -789,9 +789,9 @@ class ReplicaManager(val config: KafkaConfig,
             hasCustomErrorMessage = customException.isDefined
           )
       }
-      // In non-transaction paths, errorResults is typically empty, so we can 
+      // In non-transaction paths, errorResults is typically empty, so we can
       // directly use entriesPerPartition instead of creating a new filtered 
collection
-      val entriesWithoutErrorsPerPartition = 
+      val entriesWithoutErrorsPerPartition =
         if (errorResults.nonEmpty) entriesPerPartition.filter { case (key, _) 
=> !errorResults.contains(key) }
         else entriesPerPartition
 
@@ -1637,13 +1637,13 @@ class ReplicaManager(val config: KafkaConfig,
                                    remoteFetchPartitionStatus: 
Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
     val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
     val remoteFetchResults = new util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]
-    
+
     remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) =>
       val (task, result) = processRemoteFetch(remoteFetchInfo)
       remoteFetchTasks.put(topicIdPartition, task)
       remoteFetchResults.put(topicIdPartition, result)
     }
-    
+
     val remoteFetchMaxWaitMs = 
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
     val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, 
remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
       remoteFetchPartitionStatus, params, logReadResults, this, 
responseCallback)
diff --git 
a/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java 
b/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java
new file mode 100644
index 00000000000..a8f7ed9cc9d
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.logger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple class that sets logIdent appropriately depending on whether the 
state change logger is being used in the
+ * context of the broker (e.g. ReplicaManager and Partition).
+ */
+public class StateChangeLogger {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger("state.change.logger");
+
+    private final String logIdent;
+
+    public StateChangeLogger(int brokerId) {
+        this.logIdent = String.format("[Broker id=%d] ", brokerId);
+    }
+
+    public void trace(String message) {
+        LOGGER.info("{}{}", logIdent, message);
+    }
+
+    public void info(String message) {
+        LOGGER.info("{}{}", logIdent, message);
+    }
+
+    public void warn(String message) {
+        LOGGER.warn("{}{}", logIdent, message);
+    }
+
+    public void error(String message) {
+        LOGGER.error("{}{}", logIdent, message);
+    }
+
+    public void error(String message, Throwable e) {
+        LOGGER.error("{}{}", logIdent, message, e);
+    }
+}

Reply via email to