This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6819c584f [MINOR] feat(coordinator/server): Support RPC from-address 
to audit log and refactor RpcAuditContext (#2013)
6819c584f is described below

commit 6819c584fa06fa566353726a6f7bb06cc9571097
Author: maobaolong <baoloong...@tencent.com>
AuthorDate: Wed Aug 28 10:59:46 2024 +0800

    [MINOR] feat(coordinator/server): Support RPC from-address to audit log and 
refactor RpcAuditContext (#2013)
    
    ### What changes were proposed in this pull request?
    
    - Support RPC from-address to audit log
    - Do refactor to RpcAuditContext
    
    ### Why are the changes needed?
    
    Add from information for each call to make it easy for trouble-shooting.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Start coordinator\servers and check the rpc auditlog
    
    
    ```
    # coordinator_rpc_audit.log
    [2024-08-05 19:12:19.532] cmd=registerApplicationInfo   statusCode=SUCCESS  
    from=/x.x.x.x:49416     executionTimeUs=1514    
appId=app-20240805190949-0009_1722856187872     args{user=user}
    
    # server_rpc_audit.log
    [2024-08-05 19:12:21.511] cmd=requireBuffer     statusCode=SUCCESS      
from=/x.x.x.x:43904     executionTimeUs=1294    
appId=app-20240805190949-0009_1722856187872     shuffleId=0     
args{requireSize=233, partitionIdsSize=1}
    
    [2024-08-05 19:12:21.778] cmd=sendShuffleData   statusCode=SUCCESS      
from=/x.x.x.x:39340     executionTimeUs=25464   
appId=app-20240805190949-0009_1722856187872     shuffleId=0     
args{requireBufferId=4, requireSize=233, isPreAllocated=true, 
requireBlocksSize=39, stageAttemptNumber=0}
    
    ```
---
 .../uniffle/common/audit/RpcAuditContext.java      |  86 ++++++-------
 .../common/rpc/ClientContextServerInterceptor.java |  72 +++++++++++
 .../org/apache/uniffle/common/rpc/GrpcServer.java  |   1 +
 .../coordinator/CoordinatorGrpcService.java        |  45 ++++---
 .../audit/CoordinatorRpcAuditContext.java          |  30 ++++-
 .../uniffle/server/ShuffleServerGrpcService.java   |  48 ++++----
 .../server/audit/ServerRPCAuditContext.java        | 134 ---------------------
 .../server/audit/ServerRpcAuditContext.java        |  36 +++++-
 .../server/netty/ShuffleServerNettyHandler.java    |  28 +++--
 9 files changed, 239 insertions(+), 241 deletions(-)

diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
 b/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java
similarity index 60%
rename from 
coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
rename to 
common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java
index a66bd2805..fbe85efd0 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
+++ b/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java
@@ -15,40 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.coordinator.audit;
+package org.apache.uniffle.common.audit;
+
+import java.io.Closeable;
 
 import org.slf4j.Logger;
 
-import org.apache.uniffle.common.audit.AuditContext;
 import org.apache.uniffle.common.rpc.StatusCode;
 
-/** An audit context for coordinator rpc. */
-public class CoordinatorRPCAuditContext implements AuditContext {
+/** Context for rpc audit logging. */
+public abstract class RpcAuditContext implements Closeable {
   private final Logger log;
   private String command;
   private String statusCode;
-  private long creationTimeNs;
-  private long executionTimeNs;
-  private String appId = "N/A";
-  private int shuffleId = -1;
   private String args;
+  private String returnValue;
+  private String from;
+  private long creationTimeNs;
+  protected long executionTimeNs;
 
-  /**
-   * Constructor of {@link CoordinatorRPCAuditContext}.
-   *
-   * @param log the logger to log the audit information
-   */
-  public CoordinatorRPCAuditContext(Logger log) {
+  public RpcAuditContext(Logger log) {
     this.log = log;
   }
 
+  protected abstract String content();
+
   /**
    * Sets mCommand field.
    *
    * @param command the command associated with shuffle server rpc
-   * @return this {@link AuditContext} instance
+   * @return this {@link RpcAuditContext} instance
    */
-  public CoordinatorRPCAuditContext withCommand(String command) {
+  public RpcAuditContext withCommand(String command) {
     this.command = command;
     return this;
   }
@@ -58,9 +56,9 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
    *
    * @param creationTimeNs the System.nanoTime() when this operation create, 
it only can be used to
    *     compute operation mExecutionTime
-   * @return this {@link AuditContext} instance
+   * @return this {@link RpcAuditContext} instance
    */
-  public CoordinatorRPCAuditContext withCreationTimeNs(long creationTimeNs) {
+  public RpcAuditContext withCreationTimeNs(long creationTimeNs) {
     this.creationTimeNs = creationTimeNs;
     return this;
   }
@@ -69,9 +67,9 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
    * Sets status code field.
    *
    * @param statusCode the status code
-   * @return this {@link AuditContext} instance
+   * @return this {@link RpcAuditContext} instance
    */
-  public CoordinatorRPCAuditContext withStatusCode(StatusCode statusCode) {
+  public RpcAuditContext withStatusCode(StatusCode statusCode) {
     if (statusCode == null) {
       this.statusCode = "UNKNOWN";
     } else {
@@ -84,10 +82,9 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
    * Sets status code field.
    *
    * @param statusCode the status code
-   * @return this {@link AuditContext} instance
+   * @return this {@link RpcAuditContext} instance
    */
-  public CoordinatorRPCAuditContext withStatusCode(
-      org.apache.uniffle.proto.RssProtos.StatusCode statusCode) {
+  public RpcAuditContext 
withStatusCode(org.apache.uniffle.proto.RssProtos.StatusCode statusCode) {
     if (statusCode == null) {
       this.statusCode = "UNKNOWN";
     } else {
@@ -100,13 +97,28 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
    * Sets status code field.
    *
    * @param statusCode the status code
-   * @return this {@link AuditContext} instance
+   * @return this {@link RpcAuditContext} instance
    */
-  public CoordinatorRPCAuditContext withStatusCode(String statusCode) {
+  public RpcAuditContext withStatusCode(String statusCode) {
     this.statusCode = statusCode;
     return this;
   }
 
+  public RpcAuditContext withArgs(String args) {
+    this.args = args;
+    return this;
+  }
+
+  public RpcAuditContext withReturnValue(String returnValue) {
+    this.returnValue = returnValue;
+    return this;
+  }
+
+  public RpcAuditContext withFrom(String from) {
+    this.from = from;
+    return this;
+  }
+
   @Override
   public void close() {
     if (log == null) {
@@ -120,26 +132,14 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
   public String toString() {
     String line =
         String.format(
-            
"cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d\t",
-            command, statusCode, appId, shuffleId, executionTimeNs / 1000);
+            "cmd=%s\tstatusCode=%s\tfrom=%s\texecutionTimeUs=%d\t%s",
+            command, statusCode, from, executionTimeNs / 1000, content());
     if (args != null) {
-      line += String.format("args{%s}", args);
+      line += String.format("\targs{%s}", args);
+    }
+    if (returnValue != null) {
+      line += String.format("\treturn{%s}", returnValue);
     }
     return line;
   }
-
-  public CoordinatorRPCAuditContext withAppId(String appId) {
-    this.appId = appId;
-    return this;
-  }
-
-  public CoordinatorRPCAuditContext withShuffleId(int shuffleId) {
-    this.shuffleId = shuffleId;
-    return this;
-  }
-
-  public CoordinatorRPCAuditContext withArgs(String args) {
-    this.args = args;
-    return this;
-  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/rpc/ClientContextServerInterceptor.java
 
b/common/src/main/java/org/apache/uniffle/common/rpc/ClientContextServerInterceptor.java
new file mode 100644
index 000000000..e8c064a1c
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/rpc/ClientContextServerInterceptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.rpc;
+
+import javax.annotation.Nullable;
+
+import io.grpc.ForwardingServerCallListener;
+import io.grpc.Grpc;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+
+/**
+ * Server side interceptor that is used to put remote client's IP Address to 
thread local storage.
+ */
+public class ClientContextServerInterceptor implements ServerInterceptor {
+
+  /**
+   * A {@link ThreadLocal} variable to maintain the client's IP address along 
with a specific
+   * thread.
+   */
+  private static final ThreadLocal<String> IP_ADDRESS_THREAD_LOCAL = new 
ThreadLocal<>();
+
+  /** @return IP address of the gRPC client that is making the call */
+  @Nullable public static String getIpAddress() {
+    return IP_ADDRESS_THREAD_LOCAL.get();
+  }
+
+  @Override
+  public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
+      ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, 
RespT> next) {
+    /**
+     * For streaming calls, below will make sure remote IP address and client 
version are injected
+     * prior to creating the stream.
+     */
+    setRemoteIpAddress(call);
+
+    /**
+     * For non-streaming calls to server, below listener will be invoked in 
the same thread that is
+     * serving the call.
+     */
+    return new 
ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
+        next.startCall(call, headers)) {
+      @Override
+      public void onHalfClose() {
+        setRemoteIpAddress(call);
+        super.onHalfClose();
+      }
+    };
+  }
+
+  private <ReqT, RespT> void setRemoteIpAddress(ServerCall<ReqT, RespT> call) {
+    String remoteIpAddress = 
call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString();
+    IP_ADDRESS_THREAD_LOCAL.set(remoteIpAddress);
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index cf1f05fea..0929fa248 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -118,6 +118,7 @@ public class GrpcServer implements ServerInterface {
     servicesWithInterceptors.forEach(
         (serviceWithInterceptors) -> {
           List<ServerInterceptor> interceptors = 
serviceWithInterceptors.getRight();
+          interceptors.add(new ClientContextServerInterceptor());
           if (isMetricsEnabled) {
             MonitoringServerInterceptor monitoringInterceptor =
                 new MonitoringServerInterceptor(grpcMetrics);
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index d618211e0..9f7c5cdd4 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -36,11 +36,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ServerStatus;
-import org.apache.uniffle.common.audit.AuditContext;
+import org.apache.uniffle.common.audit.RpcAuditContext;
+import org.apache.uniffle.common.rpc.ClientContextServerInterceptor;
 import org.apache.uniffle.common.storage.StorageInfoUtils;
 import org.apache.uniffle.coordinator.access.AccessCheckResult;
 import org.apache.uniffle.coordinator.access.AccessInfo;
-import org.apache.uniffle.coordinator.audit.CoordinatorRPCAuditContext;
+import org.apache.uniffle.coordinator.audit.CoordinatorRpcAuditContext;
 import org.apache.uniffle.coordinator.conf.RssClientConfFetchInfo;
 import 
org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
 import org.apache.uniffle.coordinator.util.CoordinatorUtils;
@@ -124,7 +125,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   public void getShuffleAssignments(
       GetShuffleServerRequest request,
       StreamObserver<GetShuffleAssignmentsResponse> responseObserver) {
-    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("getShuffleAssignments")) {
+    try (CoordinatorRpcAuditContext auditContext = 
createAuditContext("getShuffleAssignments")) {
       final String appId = request.getApplicationId();
       final int shuffleId = request.getShuffleId();
       final int partitionNum = request.getPartitionNum();
@@ -135,11 +136,12 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
       final Set<String> faultyServerIds = new 
HashSet<>(request.getFaultyServerIdsList());
 
-      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withAppId(appId);
       auditContext.withArgs(
           String.format(
-              "partitionNum=%d, partitionNumPerRange=%d, replica=%d, 
requiredTags=%s, "
+              "shuffleId=%d, partitionNum=%d, partitionNumPerRange=%d, 
replica=%d, requiredTags=%s, "
                   + "requiredShuffleServerNumber=%d, faultyServerIds=%s, 
stageId=%d, stageAttemptNumber=%d, isReassign=%b",
+              shuffleId,
               partitionNum,
               partitionNumPerRange,
               replica,
@@ -216,7 +218,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   public void heartbeat(
       ShuffleServerHeartBeatRequest request,
       StreamObserver<ShuffleServerHeartBeatResponse> responseObserver) {
-    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("heartbeat")) {
+    try (CoordinatorRpcAuditContext auditContext = 
createAuditContext("heartbeat")) {
       final ServerNode serverNode = toServerNode(request);
       auditContext.withArgs("serverNode=" + serverNode.getId());
       coordinatorServer.getClusterManager().add(serverNode);
@@ -237,7 +239,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   @Override
   public void checkServiceAvailable(
       Empty request, StreamObserver<CheckServiceAvailableResponse> 
responseObserver) {
-    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("checkServiceAvailable")) {
+    try (CoordinatorRpcAuditContext auditContext = 
createAuditContext("checkServiceAvailable")) {
       final CheckServiceAvailableResponse response =
           CheckServiceAvailableResponse.newBuilder()
               
.setAvailable(coordinatorServer.getClusterManager().getNodesNum() > 0)
@@ -252,7 +254,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   public void reportClientOperation(
       ReportShuffleClientOpRequest request,
       StreamObserver<ReportShuffleClientOpResponse> responseObserver) {
-    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("reportClientOperation")) {
+    try (CoordinatorRpcAuditContext auditContext = 
createAuditContext("reportClientOperation")) {
       final String clientHost = request.getClientHost();
       final int clientPort = request.getClientPort();
       final ShuffleServerId shuffleServer = request.getServer();
@@ -275,7 +277,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   @Override
   public void appHeartbeat(
       AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse> 
responseObserver) {
-    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("appHeartbeat")) {
+    try (CoordinatorRpcAuditContext auditContext = 
createAuditContext("appHeartbeat")) {
       String appId = request.getAppId();
       auditContext.withAppId(appId);
       coordinatorServer.getApplicationManager().refreshAppId(appId);
@@ -302,7 +304,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   @Override
   public void registerApplicationInfo(
       ApplicationInfoRequest request, StreamObserver<ApplicationInfoResponse> 
responseObserver) {
-    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("registerApplicationInfo")) {
+    try (CoordinatorRpcAuditContext auditContext = 
createAuditContext("registerApplicationInfo")) {
       String appId = request.getAppId();
       String user = request.getUser();
       auditContext.withAppId(appId).withArgs("user=" + user);
@@ -332,7 +334,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   @Override
   public void accessCluster(
       AccessClusterRequest request, StreamObserver<AccessClusterResponse> 
responseObserver) {
-    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("accessCluster")) {
+    try (CoordinatorRpcAuditContext auditContext = 
createAuditContext("accessCluster")) {
       StatusCode statusCode = StatusCode.SUCCESS;
       AccessClusterResponse response;
       AccessManager accessManager = coordinatorServer.getAccessManager();
@@ -376,7 +378,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   @Override
   public void fetchClientConf(
       Empty empty, StreamObserver<FetchClientConfResponse> responseObserver) {
-    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("fetchClientConf")) {
+    try (CoordinatorRpcAuditContext auditContext = 
createAuditContext("fetchClientConf")) {
       fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO, 
responseObserver);
       auditContext.withStatusCode(StatusCode.SUCCESS);
     }
@@ -385,7 +387,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   @Override
   public void fetchClientConfV2(
       FetchClientConfRequest request, StreamObserver<FetchClientConfResponse> 
responseObserver) {
-    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("fetchClientConfV2")) {
+    try (CoordinatorRpcAuditContext auditContext = 
createAuditContext("fetchClientConfV2")) {
       fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request), 
responseObserver);
       auditContext.withStatusCode(StatusCode.SUCCESS);
     }
@@ -426,7 +428,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   public void fetchRemoteStorage(
       FetchRemoteStorageRequest request,
       StreamObserver<FetchRemoteStorageResponse> responseObserver) {
-    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("fetchRemoteStorage")) {
+    try (CoordinatorRpcAuditContext auditContext = 
createAuditContext("fetchRemoteStorage")) {
       FetchRemoteStorageResponse response;
       StatusCode status = StatusCode.SUCCESS;
       String appId = request.getAppId();
@@ -523,20 +525,23 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   }
 
   /**
-   * Creates a {@link CoordinatorRPCAuditContext} instance.
+   * Creates a {@link CoordinatorRpcAuditContext} instance.
    *
-   * @param command the command to be logged by this {@link AuditContext}
-   * @return newly-created {@link CoordinatorRPCAuditContext} instance
+   * @param command the command to be logged by this {@link RpcAuditContext}
+   * @return newly-created {@link CoordinatorRpcAuditContext} instance
    */
-  private CoordinatorRPCAuditContext createAuditContext(String command) {
+  private CoordinatorRpcAuditContext createAuditContext(String command) {
     // Audit log may be enabled during runtime
     Logger auditLogger = null;
     if (isRpcAuditLogEnabled && !rpcAuditExcludeOpList.contains(command)) {
       auditLogger = AUDIT_LOGGER;
     }
-    CoordinatorRPCAuditContext auditContext = new 
CoordinatorRPCAuditContext(auditLogger);
+    CoordinatorRpcAuditContext auditContext = new 
CoordinatorRpcAuditContext(auditLogger);
     if (auditLogger != null) {
-      auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
+      auditContext
+          .withCommand(command)
+          .withFrom(ClientContextServerInterceptor.getIpAddress())
+          .withCreationTimeNs(System.nanoTime());
     }
     return auditContext;
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRpcAuditContext.java
similarity index 55%
copy from common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java
copy to 
coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRpcAuditContext.java
index 77fbb9374..a23ae2ba3 100644
--- a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRpcAuditContext.java
@@ -15,12 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common.audit;
+package org.apache.uniffle.coordinator.audit;
 
-import java.io.Closeable;
+import org.slf4j.Logger;
+
+import org.apache.uniffle.common.audit.RpcAuditContext;
+
+/** An audit context for coordinator rpc. */
+public class CoordinatorRpcAuditContext extends RpcAuditContext {
+  private String appId = "N/A";
+
+  /**
+   * Constructor of {@link CoordinatorRpcAuditContext}.
+   *
+   * @param log the logger to log the audit information
+   */
+  public CoordinatorRpcAuditContext(Logger log) {
+    super(log);
+  }
 
-/** Context for audit logging. */
-public interface AuditContext extends Closeable {
   @Override
-  void close();
+  protected String content() {
+    return String.format("appId=%s", appId);
+  }
+
+  public CoordinatorRpcAuditContext withAppId(String appId) {
+    this.appId = appId;
+    return this;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index b0fe9d9ef..5cce3a3a8 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -47,13 +47,14 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShufflePartitionedData;
-import org.apache.uniffle.common.audit.AuditContext;
+import org.apache.uniffle.common.audit.RpcAuditContext;
 import org.apache.uniffle.common.config.RssBaseConf;
 import 
org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
 import org.apache.uniffle.common.exception.FileNotFoundException;
 import org.apache.uniffle.common.exception.NoBufferException;
 import org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
 import org.apache.uniffle.common.exception.NoRegisterException;
+import org.apache.uniffle.common.rpc.ClientContextServerInterceptor;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ByteBufUtils;
@@ -90,7 +91,7 @@ import 
org.apache.uniffle.proto.RssProtos.ShufflePartitionRange;
 import org.apache.uniffle.proto.RssProtos.ShuffleRegisterRequest;
 import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
 import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase;
-import org.apache.uniffle.server.audit.ServerRPCAuditContext;
+import org.apache.uniffle.server.audit.ServerRpcAuditContext;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.common.StorageReadMetrics;
@@ -125,7 +126,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   public void unregisterShuffleByAppId(
       RssProtos.ShuffleUnregisterByAppIdRequest request,
       StreamObserver<RssProtos.ShuffleUnregisterByAppIdResponse> 
responseStreamObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("unregisterShuffleByAppId")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("unregisterShuffleByAppId")) {
       String appId = request.getAppId();
       auditContext.withAppId(appId);
       StatusCode status = verifyRequest(appId);
@@ -163,7 +164,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   public void unregisterShuffle(
       RssProtos.ShuffleUnregisterRequest request,
       StreamObserver<RssProtos.ShuffleUnregisterResponse> 
responseStreamObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("unregisterShuffle")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("unregisterShuffle")) {
       String appId = request.getAppId();
       int shuffleId = request.getShuffleId();
       auditContext.withAppId(appId).withShuffleId(shuffleId);
@@ -200,7 +201,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   @Override
   public void registerShuffle(
       ShuffleRegisterRequest req, StreamObserver<ShuffleRegisterResponse> 
responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("registerShuffle")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("registerShuffle")) {
       ShuffleRegisterResponse reply;
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
@@ -306,7 +307,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   @Override
   public void sendShuffleData(
       SendShuffleDataRequest req, StreamObserver<SendShuffleDataResponse> 
responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("sendShuffleData")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("sendShuffleData")) {
       SendShuffleDataResponse reply;
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
@@ -519,7 +520,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   @Override
   public void commitShuffleTask(
       ShuffleCommitRequest req, StreamObserver<ShuffleCommitResponse> 
responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("commitShuffleTask")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("commitShuffleTask")) {
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
       auditContext.withAppId(appId).withShuffleId(shuffleId);
@@ -576,7 +577,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   @Override
   public void finishShuffle(
       FinishShuffleRequest req, StreamObserver<FinishShuffleResponse> 
responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("finishShuffle")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("finishShuffle")) {
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
       auditContext.withAppId(appId).withShuffleId(shuffleId);
@@ -624,7 +625,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   @Override
   public void requireBuffer(
       RequireBufferRequest request, StreamObserver<RequireBufferResponse> 
responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("requireBuffer")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("requireBuffer")) {
       String appId = request.getAppId();
       auditContext.withAppId(appId).withShuffleId(request.getShuffleId());
       String auditArgs = "requireSize=" + request.getRequireSize();
@@ -704,7 +705,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   @Override
   public void appHeartbeat(
       AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse> 
responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("appHeartbeat")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("appHeartbeat")) {
       String appId = request.getAppId();
       auditContext.withAppId(appId);
       StatusCode status = verifyRequest(appId);
@@ -745,7 +746,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   public void reportShuffleResult(
       ReportShuffleResultRequest request,
       StreamObserver<ReportShuffleResultResponse> responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("reportShuffleResult")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("reportShuffleResult")) {
       String appId = request.getAppId();
       int shuffleId = request.getShuffleId();
       long taskAttemptId = request.getTaskAttemptId();
@@ -825,7 +826,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   @Override
   public void getShuffleResult(
       GetShuffleResultRequest request, 
StreamObserver<GetShuffleResultResponse> responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("getShuffleResult")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("getShuffleResult")) {
       String appId = request.getAppId();
       int shuffleId = request.getShuffleId();
       int partitionId = request.getPartitionId();
@@ -892,7 +893,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   public void getShuffleResultForMultiPart(
       GetShuffleResultForMultiPartRequest request,
       StreamObserver<GetShuffleResultForMultiPartResponse> responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("getShuffleResultForMultiPart")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("getShuffleResultForMultiPart")) {
       String appId = request.getAppId();
       int shuffleId = request.getShuffleId();
       List<Integer> partitionsList = request.getPartitionsList();
@@ -962,7 +963,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   public void getLocalShuffleData(
       GetLocalShuffleDataRequest request,
       StreamObserver<GetLocalShuffleDataResponse> responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("getLocalShuffleData")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("getLocalShuffleData")) {
       String appId = request.getAppId();
       int shuffleId = request.getShuffleId();
       int partitionId = request.getPartitionId();
@@ -1108,7 +1109,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   public void getLocalShuffleIndex(
       GetLocalShuffleIndexRequest request,
       StreamObserver<GetLocalShuffleIndexResponse> responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("getLocalShuffleIndex")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("getLocalShuffleIndex")) {
       String appId = request.getAppId();
       int shuffleId = request.getShuffleId();
       int partitionId = request.getPartitionId();
@@ -1232,7 +1233,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   public void getMemoryShuffleData(
       GetMemoryShuffleDataRequest request,
       StreamObserver<GetMemoryShuffleDataResponse> responseObserver) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("getMemoryShuffleData")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("getMemoryShuffleData")) {
       String appId = request.getAppId();
       int shuffleId = request.getShuffleId();
       int partitionId = request.getPartitionId();
@@ -1450,20 +1451,23 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   }
 
   /**
-   * Creates a {@link ServerRPCAuditContext} instance.
+   * Creates a {@link ServerRpcAuditContext} instance.
    *
-   * @param command the command to be logged by this {@link AuditContext}
-   * @return newly-created {@link ServerRPCAuditContext} instance
+   * @param command the command to be logged by this {@link RpcAuditContext}
+   * @return newly-created {@link ServerRpcAuditContext} instance
    */
-  private ServerRPCAuditContext createAuditContext(String command) {
+  private ServerRpcAuditContext createAuditContext(String command) {
     // Audit log may be enabled during runtime
     Logger auditLogger = null;
     if (isRpcAuditLogEnabled && !rpcAuditExcludeOpList.contains(command)) {
       auditLogger = AUDIT_LOGGER;
     }
-    ServerRPCAuditContext auditContext = new 
ServerRPCAuditContext(auditLogger);
+    ServerRpcAuditContext auditContext = new 
ServerRpcAuditContext(auditLogger);
     if (auditLogger != null) {
-      auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
+      auditContext
+          .withCommand(command)
+          .withFrom(ClientContextServerInterceptor.getIpAddress())
+          .withCreationTimeNs(System.nanoTime());
     }
     return auditContext;
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
 
b/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
deleted file mode 100644
index b78c606a5..000000000
--- 
a/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.server.audit;
-
-import org.slf4j.Logger;
-
-import org.apache.uniffle.common.audit.AuditContext;
-import org.apache.uniffle.common.rpc.StatusCode;
-
-/** An audit context for shuffle server rpc. */
-public class ServerRPCAuditContext implements AuditContext {
-  private final Logger log;
-  private String command;
-  private String statusCode;
-  private long creationTimeNs;
-  private long executionTimeNs;
-  private String appId = "N/A";
-  private int shuffleId = -1;
-  private String args;
-  private String returnValue;
-
-  /**
-   * Constructor of {@link ServerRPCAuditContext}.
-   *
-   * @param log the logger to log the audit information
-   */
-  public ServerRPCAuditContext(Logger log) {
-    this.log = log;
-  }
-
-  /**
-   * Sets mCommand field.
-   *
-   * @param command the command associated with shuffle server rpc
-   * @return this {@link AuditContext} instance
-   */
-  public ServerRPCAuditContext withCommand(String command) {
-    this.command = command;
-    return this;
-  }
-
-  /**
-   * Sets creationTimeNs field.
-   *
-   * @param creationTimeNs the System.nanoTime() when this operation create, 
it only can be used to
-   *     compute operation mExecutionTime
-   * @return this {@link AuditContext} instance
-   */
-  public ServerRPCAuditContext withCreationTimeNs(long creationTimeNs) {
-    this.creationTimeNs = creationTimeNs;
-    return this;
-  }
-
-  /**
-   * Sets status code field.
-   *
-   * @param statusCode the status code
-   * @return this {@link AuditContext} instance
-   */
-  public ServerRPCAuditContext withStatusCode(StatusCode statusCode) {
-    this.statusCode = statusCode.name();
-    return this;
-  }
-
-  /**
-   * Sets status code field.
-   *
-   * @param statusCode the status code
-   * @return this {@link AuditContext} instance
-   */
-  public ServerRPCAuditContext withStatusCode(String statusCode) {
-    this.statusCode = statusCode;
-    return this;
-  }
-
-  @Override
-  public void close() {
-    if (log == null) {
-      return;
-    }
-    executionTimeNs = System.nanoTime() - creationTimeNs;
-    log.info(toString());
-  }
-
-  @Override
-  public String toString() {
-    String line =
-        String.format(
-            
"cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d",
-            command, statusCode, appId, shuffleId, executionTimeNs / 1000);
-    if (args != null) {
-      line += String.format("\targs{%s}", args);
-    }
-    if (returnValue != null) {
-      line += String.format("\treturn{%s}", returnValue);
-    }
-    return line;
-  }
-
-  public ServerRPCAuditContext withAppId(String appId) {
-    this.appId = appId;
-    return this;
-  }
-
-  public ServerRPCAuditContext withShuffleId(int shuffleId) {
-    this.shuffleId = shuffleId;
-    return this;
-  }
-
-  public ServerRPCAuditContext withArgs(String args) {
-    this.args = args;
-    return this;
-  }
-
-  public ServerRPCAuditContext withReturnValue(String returnValue) {
-    this.returnValue = returnValue;
-    return this;
-  }
-}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java 
b/server/src/main/java/org/apache/uniffle/server/audit/ServerRpcAuditContext.java
similarity index 50%
rename from 
common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java
rename to 
server/src/main/java/org/apache/uniffle/server/audit/ServerRpcAuditContext.java
index 77fbb9374..00c98f518 100644
--- a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/audit/ServerRpcAuditContext.java
@@ -15,12 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common.audit;
+package org.apache.uniffle.server.audit;
 
-import java.io.Closeable;
+import org.slf4j.Logger;
+
+import org.apache.uniffle.common.audit.RpcAuditContext;
+
+/** An audit context for shuffle server rpc. */
+public class ServerRpcAuditContext extends RpcAuditContext {
+  private String appId = "N/A";
+  private int shuffleId = -1;
+
+  /**
+   * Constructor of {@link ServerRpcAuditContext}.
+   *
+   * @param log the logger to log the audit information
+   */
+  public ServerRpcAuditContext(Logger log) {
+    super(log);
+  }
 
-/** Context for audit logging. */
-public interface AuditContext extends Closeable {
   @Override
-  void close();
+  protected String content() {
+    return String.format("appId=%s\tshuffleId=%s", appId, shuffleId);
+  }
+
+  public ServerRpcAuditContext withAppId(String appId) {
+    this.appId = appId;
+    return this;
+  }
+
+  public ServerRpcAuditContext withShuffleId(int shuffleId) {
+    this.shuffleId = shuffleId;
+    return this;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index fdb723dfb..fb4d824d9 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -40,7 +40,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShufflePartitionedData;
-import org.apache.uniffle.common.audit.AuditContext;
+import org.apache.uniffle.common.audit.RpcAuditContext;
 import org.apache.uniffle.common.config.RssBaseConf;
 import 
org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
 import org.apache.uniffle.common.exception.FileNotFoundException;
@@ -65,7 +65,7 @@ import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
 import org.apache.uniffle.server.ShuffleTaskInfo;
 import org.apache.uniffle.server.ShuffleTaskManager;
-import org.apache.uniffle.server.audit.ServerRPCAuditContext;
+import org.apache.uniffle.server.audit.ServerRpcAuditContext;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.storage.common.Storage;
@@ -121,7 +121,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
   }
 
   public void handleSendShuffleDataRequest(TransportClient client, 
SendShuffleDataRequest req) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("sendShuffleData")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("sendShuffleData", client)) {
       RpcResponse rpcResponse;
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
@@ -386,7 +386,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
 
   public void handleGetMemoryShuffleDataRequest(
       TransportClient client, GetMemoryShuffleDataRequest req) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("getMemoryShuffleData")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("getMemoryShuffleData", client)) {
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
       int partitionId = req.getPartitionId();
@@ -497,7 +497,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
 
   public void handleGetLocalShuffleIndexRequest(
       TransportClient client, GetLocalShuffleIndexRequest req) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("getLocalShuffleIndex")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("getLocalShuffleIndex", client)) {
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
       int partitionId = req.getPartitionId();
@@ -607,7 +607,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
   }
 
   public void handleGetLocalShuffleData(TransportClient client, 
GetLocalShuffleDataRequest req) {
-    try (ServerRPCAuditContext auditContext = 
createAuditContext("getLocalShuffleData")) {
+    try (ServerRpcAuditContext auditContext = 
createAuditContext("getLocalShuffleData", client)) {
       GetLocalShuffleDataResponse response;
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
@@ -879,20 +879,24 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
   }
 
   /**
-   * Creates a {@link ServerRPCAuditContext} instance.
+   * Creates a {@link ServerRpcAuditContext} instance.
    *
-   * @param command the command to be logged by this {@link AuditContext}
-   * @return newly-created {@link ServerRPCAuditContext} instance
+   * @param command the command to be logged by this {@link RpcAuditContext}
+   * @return newly-created {@link ServerRpcAuditContext} instance
    */
-  private ServerRPCAuditContext createAuditContext(String command) {
+  private ServerRpcAuditContext createAuditContext(
+      String command, TransportClient transportClient) {
     // Audit log may be enabled during runtime
     Logger auditLogger = null;
     if (isRpcAuditLogEnabled && !rpcAuditExcludeOpList.contains(command)) {
       auditLogger = AUDIT_LOGGER;
     }
-    ServerRPCAuditContext auditContext = new 
ServerRPCAuditContext(auditLogger);
+    ServerRpcAuditContext auditContext = new 
ServerRpcAuditContext(auditLogger);
     if (auditLogger != null) {
-      auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
+      auditContext
+          .withCommand(command)
+          .withFrom(transportClient.getSocketAddress().toString())
+          .withCreationTimeNs(System.nanoTime());
     }
     return auditContext;
   }


Reply via email to