This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 6819c584f [MINOR] feat(coordinator/server): Support RPC from-address to audit log and refactor RpcAuditContext (#2013) 6819c584f is described below commit 6819c584fa06fa566353726a6f7bb06cc9571097 Author: maobaolong <baoloong...@tencent.com> AuthorDate: Wed Aug 28 10:59:46 2024 +0800 [MINOR] feat(coordinator/server): Support RPC from-address to audit log and refactor RpcAuditContext (#2013) ### What changes were proposed in this pull request? - Support RPC from-address to audit log - Do refactor to RpcAuditContext ### Why are the changes needed? Add from information for each call to make it easy for trouble-shooting. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Start coordinator\servers and check the rpc auditlog ``` # coordinator_rpc_audit.log [2024-08-05 19:12:19.532] cmd=registerApplicationInfo statusCode=SUCCESS from=/x.x.x.x:49416 executionTimeUs=1514 appId=app-20240805190949-0009_1722856187872 args{user=user} # server_rpc_audit.log [2024-08-05 19:12:21.511] cmd=requireBuffer statusCode=SUCCESS from=/x.x.x.x:43904 executionTimeUs=1294 appId=app-20240805190949-0009_1722856187872 shuffleId=0 args{requireSize=233, partitionIdsSize=1} [2024-08-05 19:12:21.778] cmd=sendShuffleData statusCode=SUCCESS from=/x.x.x.x:39340 executionTimeUs=25464 appId=app-20240805190949-0009_1722856187872 shuffleId=0 args{requireBufferId=4, requireSize=233, isPreAllocated=true, requireBlocksSize=39, stageAttemptNumber=0} ``` --- .../uniffle/common/audit/RpcAuditContext.java | 86 ++++++------- .../common/rpc/ClientContextServerInterceptor.java | 72 +++++++++++ .../org/apache/uniffle/common/rpc/GrpcServer.java | 1 + .../coordinator/CoordinatorGrpcService.java | 45 ++++--- .../audit/CoordinatorRpcAuditContext.java | 30 ++++- .../uniffle/server/ShuffleServerGrpcService.java | 48 ++++---- .../server/audit/ServerRPCAuditContext.java | 134 --------------------- .../server/audit/ServerRpcAuditContext.java | 36 +++++- .../server/netty/ShuffleServerNettyHandler.java | 28 +++-- 9 files changed, 239 insertions(+), 241 deletions(-) diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java b/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java similarity index 60% rename from coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java rename to common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java index a66bd2805..fbe85efd0 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java +++ b/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java @@ -15,40 +15,38 @@ * limitations under the License. */ -package org.apache.uniffle.coordinator.audit; +package org.apache.uniffle.common.audit; + +import java.io.Closeable; import org.slf4j.Logger; -import org.apache.uniffle.common.audit.AuditContext; import org.apache.uniffle.common.rpc.StatusCode; -/** An audit context for coordinator rpc. */ -public class CoordinatorRPCAuditContext implements AuditContext { +/** Context for rpc audit logging. */ +public abstract class RpcAuditContext implements Closeable { private final Logger log; private String command; private String statusCode; - private long creationTimeNs; - private long executionTimeNs; - private String appId = "N/A"; - private int shuffleId = -1; private String args; + private String returnValue; + private String from; + private long creationTimeNs; + protected long executionTimeNs; - /** - * Constructor of {@link CoordinatorRPCAuditContext}. - * - * @param log the logger to log the audit information - */ - public CoordinatorRPCAuditContext(Logger log) { + public RpcAuditContext(Logger log) { this.log = log; } + protected abstract String content(); + /** * Sets mCommand field. * * @param command the command associated with shuffle server rpc - * @return this {@link AuditContext} instance + * @return this {@link RpcAuditContext} instance */ - public CoordinatorRPCAuditContext withCommand(String command) { + public RpcAuditContext withCommand(String command) { this.command = command; return this; } @@ -58,9 +56,9 @@ public class CoordinatorRPCAuditContext implements AuditContext { * * @param creationTimeNs the System.nanoTime() when this operation create, it only can be used to * compute operation mExecutionTime - * @return this {@link AuditContext} instance + * @return this {@link RpcAuditContext} instance */ - public CoordinatorRPCAuditContext withCreationTimeNs(long creationTimeNs) { + public RpcAuditContext withCreationTimeNs(long creationTimeNs) { this.creationTimeNs = creationTimeNs; return this; } @@ -69,9 +67,9 @@ public class CoordinatorRPCAuditContext implements AuditContext { * Sets status code field. * * @param statusCode the status code - * @return this {@link AuditContext} instance + * @return this {@link RpcAuditContext} instance */ - public CoordinatorRPCAuditContext withStatusCode(StatusCode statusCode) { + public RpcAuditContext withStatusCode(StatusCode statusCode) { if (statusCode == null) { this.statusCode = "UNKNOWN"; } else { @@ -84,10 +82,9 @@ public class CoordinatorRPCAuditContext implements AuditContext { * Sets status code field. * * @param statusCode the status code - * @return this {@link AuditContext} instance + * @return this {@link RpcAuditContext} instance */ - public CoordinatorRPCAuditContext withStatusCode( - org.apache.uniffle.proto.RssProtos.StatusCode statusCode) { + public RpcAuditContext withStatusCode(org.apache.uniffle.proto.RssProtos.StatusCode statusCode) { if (statusCode == null) { this.statusCode = "UNKNOWN"; } else { @@ -100,13 +97,28 @@ public class CoordinatorRPCAuditContext implements AuditContext { * Sets status code field. * * @param statusCode the status code - * @return this {@link AuditContext} instance + * @return this {@link RpcAuditContext} instance */ - public CoordinatorRPCAuditContext withStatusCode(String statusCode) { + public RpcAuditContext withStatusCode(String statusCode) { this.statusCode = statusCode; return this; } + public RpcAuditContext withArgs(String args) { + this.args = args; + return this; + } + + public RpcAuditContext withReturnValue(String returnValue) { + this.returnValue = returnValue; + return this; + } + + public RpcAuditContext withFrom(String from) { + this.from = from; + return this; + } + @Override public void close() { if (log == null) { @@ -120,26 +132,14 @@ public class CoordinatorRPCAuditContext implements AuditContext { public String toString() { String line = String.format( - "cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d\t", - command, statusCode, appId, shuffleId, executionTimeNs / 1000); + "cmd=%s\tstatusCode=%s\tfrom=%s\texecutionTimeUs=%d\t%s", + command, statusCode, from, executionTimeNs / 1000, content()); if (args != null) { - line += String.format("args{%s}", args); + line += String.format("\targs{%s}", args); + } + if (returnValue != null) { + line += String.format("\treturn{%s}", returnValue); } return line; } - - public CoordinatorRPCAuditContext withAppId(String appId) { - this.appId = appId; - return this; - } - - public CoordinatorRPCAuditContext withShuffleId(int shuffleId) { - this.shuffleId = shuffleId; - return this; - } - - public CoordinatorRPCAuditContext withArgs(String args) { - this.args = args; - return this; - } } diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/ClientContextServerInterceptor.java b/common/src/main/java/org/apache/uniffle/common/rpc/ClientContextServerInterceptor.java new file mode 100644 index 000000000..e8c064a1c --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/rpc/ClientContextServerInterceptor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.rpc; + +import javax.annotation.Nullable; + +import io.grpc.ForwardingServerCallListener; +import io.grpc.Grpc; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; + +/** + * Server side interceptor that is used to put remote client's IP Address to thread local storage. + */ +public class ClientContextServerInterceptor implements ServerInterceptor { + + /** + * A {@link ThreadLocal} variable to maintain the client's IP address along with a specific + * thread. + */ + private static final ThreadLocal<String> IP_ADDRESS_THREAD_LOCAL = new ThreadLocal<>(); + + /** @return IP address of the gRPC client that is making the call */ + @Nullable public static String getIpAddress() { + return IP_ADDRESS_THREAD_LOCAL.get(); + } + + @Override + public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( + ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { + /** + * For streaming calls, below will make sure remote IP address and client version are injected + * prior to creating the stream. + */ + setRemoteIpAddress(call); + + /** + * For non-streaming calls to server, below listener will be invoked in the same thread that is + * serving the call. + */ + return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>( + next.startCall(call, headers)) { + @Override + public void onHalfClose() { + setRemoteIpAddress(call); + super.onHalfClose(); + } + }; + } + + private <ReqT, RespT> void setRemoteIpAddress(ServerCall<ReqT, RespT> call) { + String remoteIpAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString(); + IP_ADDRESS_THREAD_LOCAL.set(remoteIpAddress); + } +} diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java index cf1f05fea..0929fa248 100644 --- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java +++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java @@ -118,6 +118,7 @@ public class GrpcServer implements ServerInterface { servicesWithInterceptors.forEach( (serviceWithInterceptors) -> { List<ServerInterceptor> interceptors = serviceWithInterceptors.getRight(); + interceptors.add(new ClientContextServerInterceptor()); if (isMetricsEnabled) { MonitoringServerInterceptor monitoringInterceptor = new MonitoringServerInterceptor(grpcMetrics); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java index d618211e0..9f7c5cdd4 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java @@ -36,11 +36,12 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ServerStatus; -import org.apache.uniffle.common.audit.AuditContext; +import org.apache.uniffle.common.audit.RpcAuditContext; +import org.apache.uniffle.common.rpc.ClientContextServerInterceptor; import org.apache.uniffle.common.storage.StorageInfoUtils; import org.apache.uniffle.coordinator.access.AccessCheckResult; import org.apache.uniffle.coordinator.access.AccessInfo; -import org.apache.uniffle.coordinator.audit.CoordinatorRPCAuditContext; +import org.apache.uniffle.coordinator.audit.CoordinatorRpcAuditContext; import org.apache.uniffle.coordinator.conf.RssClientConfFetchInfo; import org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment; import org.apache.uniffle.coordinator.util.CoordinatorUtils; @@ -124,7 +125,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer public void getShuffleAssignments( GetShuffleServerRequest request, StreamObserver<GetShuffleAssignmentsResponse> responseObserver) { - try (CoordinatorRPCAuditContext auditContext = createAuditContext("getShuffleAssignments")) { + try (CoordinatorRpcAuditContext auditContext = createAuditContext("getShuffleAssignments")) { final String appId = request.getApplicationId(); final int shuffleId = request.getShuffleId(); final int partitionNum = request.getPartitionNum(); @@ -135,11 +136,12 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer final int estimateTaskConcurrency = request.getEstimateTaskConcurrency(); final Set<String> faultyServerIds = new HashSet<>(request.getFaultyServerIdsList()); - auditContext.withAppId(appId).withShuffleId(shuffleId); + auditContext.withAppId(appId); auditContext.withArgs( String.format( - "partitionNum=%d, partitionNumPerRange=%d, replica=%d, requiredTags=%s, " + "shuffleId=%d, partitionNum=%d, partitionNumPerRange=%d, replica=%d, requiredTags=%s, " + "requiredShuffleServerNumber=%d, faultyServerIds=%s, stageId=%d, stageAttemptNumber=%d, isReassign=%b", + shuffleId, partitionNum, partitionNumPerRange, replica, @@ -216,7 +218,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer public void heartbeat( ShuffleServerHeartBeatRequest request, StreamObserver<ShuffleServerHeartBeatResponse> responseObserver) { - try (CoordinatorRPCAuditContext auditContext = createAuditContext("heartbeat")) { + try (CoordinatorRpcAuditContext auditContext = createAuditContext("heartbeat")) { final ServerNode serverNode = toServerNode(request); auditContext.withArgs("serverNode=" + serverNode.getId()); coordinatorServer.getClusterManager().add(serverNode); @@ -237,7 +239,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer @Override public void checkServiceAvailable( Empty request, StreamObserver<CheckServiceAvailableResponse> responseObserver) { - try (CoordinatorRPCAuditContext auditContext = createAuditContext("checkServiceAvailable")) { + try (CoordinatorRpcAuditContext auditContext = createAuditContext("checkServiceAvailable")) { final CheckServiceAvailableResponse response = CheckServiceAvailableResponse.newBuilder() .setAvailable(coordinatorServer.getClusterManager().getNodesNum() > 0) @@ -252,7 +254,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer public void reportClientOperation( ReportShuffleClientOpRequest request, StreamObserver<ReportShuffleClientOpResponse> responseObserver) { - try (CoordinatorRPCAuditContext auditContext = createAuditContext("reportClientOperation")) { + try (CoordinatorRpcAuditContext auditContext = createAuditContext("reportClientOperation")) { final String clientHost = request.getClientHost(); final int clientPort = request.getClientPort(); final ShuffleServerId shuffleServer = request.getServer(); @@ -275,7 +277,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer @Override public void appHeartbeat( AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse> responseObserver) { - try (CoordinatorRPCAuditContext auditContext = createAuditContext("appHeartbeat")) { + try (CoordinatorRpcAuditContext auditContext = createAuditContext("appHeartbeat")) { String appId = request.getAppId(); auditContext.withAppId(appId); coordinatorServer.getApplicationManager().refreshAppId(appId); @@ -302,7 +304,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer @Override public void registerApplicationInfo( ApplicationInfoRequest request, StreamObserver<ApplicationInfoResponse> responseObserver) { - try (CoordinatorRPCAuditContext auditContext = createAuditContext("registerApplicationInfo")) { + try (CoordinatorRpcAuditContext auditContext = createAuditContext("registerApplicationInfo")) { String appId = request.getAppId(); String user = request.getUser(); auditContext.withAppId(appId).withArgs("user=" + user); @@ -332,7 +334,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer @Override public void accessCluster( AccessClusterRequest request, StreamObserver<AccessClusterResponse> responseObserver) { - try (CoordinatorRPCAuditContext auditContext = createAuditContext("accessCluster")) { + try (CoordinatorRpcAuditContext auditContext = createAuditContext("accessCluster")) { StatusCode statusCode = StatusCode.SUCCESS; AccessClusterResponse response; AccessManager accessManager = coordinatorServer.getAccessManager(); @@ -376,7 +378,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer @Override public void fetchClientConf( Empty empty, StreamObserver<FetchClientConfResponse> responseObserver) { - try (CoordinatorRPCAuditContext auditContext = createAuditContext("fetchClientConf")) { + try (CoordinatorRpcAuditContext auditContext = createAuditContext("fetchClientConf")) { fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO, responseObserver); auditContext.withStatusCode(StatusCode.SUCCESS); } @@ -385,7 +387,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer @Override public void fetchClientConfV2( FetchClientConfRequest request, StreamObserver<FetchClientConfResponse> responseObserver) { - try (CoordinatorRPCAuditContext auditContext = createAuditContext("fetchClientConfV2")) { + try (CoordinatorRpcAuditContext auditContext = createAuditContext("fetchClientConfV2")) { fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request), responseObserver); auditContext.withStatusCode(StatusCode.SUCCESS); } @@ -426,7 +428,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer public void fetchRemoteStorage( FetchRemoteStorageRequest request, StreamObserver<FetchRemoteStorageResponse> responseObserver) { - try (CoordinatorRPCAuditContext auditContext = createAuditContext("fetchRemoteStorage")) { + try (CoordinatorRpcAuditContext auditContext = createAuditContext("fetchRemoteStorage")) { FetchRemoteStorageResponse response; StatusCode status = StatusCode.SUCCESS; String appId = request.getAppId(); @@ -523,20 +525,23 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer } /** - * Creates a {@link CoordinatorRPCAuditContext} instance. + * Creates a {@link CoordinatorRpcAuditContext} instance. * - * @param command the command to be logged by this {@link AuditContext} - * @return newly-created {@link CoordinatorRPCAuditContext} instance + * @param command the command to be logged by this {@link RpcAuditContext} + * @return newly-created {@link CoordinatorRpcAuditContext} instance */ - private CoordinatorRPCAuditContext createAuditContext(String command) { + private CoordinatorRpcAuditContext createAuditContext(String command) { // Audit log may be enabled during runtime Logger auditLogger = null; if (isRpcAuditLogEnabled && !rpcAuditExcludeOpList.contains(command)) { auditLogger = AUDIT_LOGGER; } - CoordinatorRPCAuditContext auditContext = new CoordinatorRPCAuditContext(auditLogger); + CoordinatorRpcAuditContext auditContext = new CoordinatorRpcAuditContext(auditLogger); if (auditLogger != null) { - auditContext.withCommand(command).withCreationTimeNs(System.nanoTime()); + auditContext + .withCommand(command) + .withFrom(ClientContextServerInterceptor.getIpAddress()) + .withCreationTimeNs(System.nanoTime()); } return auditContext; } diff --git a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRpcAuditContext.java similarity index 55% copy from common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java copy to coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRpcAuditContext.java index 77fbb9374..a23ae2ba3 100644 --- a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRpcAuditContext.java @@ -15,12 +15,32 @@ * limitations under the License. */ -package org.apache.uniffle.common.audit; +package org.apache.uniffle.coordinator.audit; -import java.io.Closeable; +import org.slf4j.Logger; + +import org.apache.uniffle.common.audit.RpcAuditContext; + +/** An audit context for coordinator rpc. */ +public class CoordinatorRpcAuditContext extends RpcAuditContext { + private String appId = "N/A"; + + /** + * Constructor of {@link CoordinatorRpcAuditContext}. + * + * @param log the logger to log the audit information + */ + public CoordinatorRpcAuditContext(Logger log) { + super(log); + } -/** Context for audit logging. */ -public interface AuditContext extends Closeable { @Override - void close(); + protected String content() { + return String.format("appId=%s", appId); + } + + public CoordinatorRpcAuditContext withAppId(String appId) { + this.appId = appId; + return this; + } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index b0fe9d9ef..5cce3a3a8 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -47,13 +47,14 @@ import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShufflePartitionedData; -import org.apache.uniffle.common.audit.AuditContext; +import org.apache.uniffle.common.audit.RpcAuditContext; import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException; import org.apache.uniffle.common.exception.FileNotFoundException; import org.apache.uniffle.common.exception.NoBufferException; import org.apache.uniffle.common.exception.NoBufferForHugePartitionException; import org.apache.uniffle.common.exception.NoRegisterException; +import org.apache.uniffle.common.rpc.ClientContextServerInterceptor; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.common.util.ByteBufUtils; @@ -90,7 +91,7 @@ import org.apache.uniffle.proto.RssProtos.ShufflePartitionRange; import org.apache.uniffle.proto.RssProtos.ShuffleRegisterRequest; import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse; import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase; -import org.apache.uniffle.server.audit.ServerRPCAuditContext; +import org.apache.uniffle.server.audit.ServerRpcAuditContext; import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.common.StorageReadMetrics; @@ -125,7 +126,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { public void unregisterShuffleByAppId( RssProtos.ShuffleUnregisterByAppIdRequest request, StreamObserver<RssProtos.ShuffleUnregisterByAppIdResponse> responseStreamObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("unregisterShuffleByAppId")) { + try (ServerRpcAuditContext auditContext = createAuditContext("unregisterShuffleByAppId")) { String appId = request.getAppId(); auditContext.withAppId(appId); StatusCode status = verifyRequest(appId); @@ -163,7 +164,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { public void unregisterShuffle( RssProtos.ShuffleUnregisterRequest request, StreamObserver<RssProtos.ShuffleUnregisterResponse> responseStreamObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("unregisterShuffle")) { + try (ServerRpcAuditContext auditContext = createAuditContext("unregisterShuffle")) { String appId = request.getAppId(); int shuffleId = request.getShuffleId(); auditContext.withAppId(appId).withShuffleId(shuffleId); @@ -200,7 +201,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { @Override public void registerShuffle( ShuffleRegisterRequest req, StreamObserver<ShuffleRegisterResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("registerShuffle")) { + try (ServerRpcAuditContext auditContext = createAuditContext("registerShuffle")) { ShuffleRegisterResponse reply; String appId = req.getAppId(); int shuffleId = req.getShuffleId(); @@ -306,7 +307,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { @Override public void sendShuffleData( SendShuffleDataRequest req, StreamObserver<SendShuffleDataResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("sendShuffleData")) { + try (ServerRpcAuditContext auditContext = createAuditContext("sendShuffleData")) { SendShuffleDataResponse reply; String appId = req.getAppId(); int shuffleId = req.getShuffleId(); @@ -519,7 +520,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { @Override public void commitShuffleTask( ShuffleCommitRequest req, StreamObserver<ShuffleCommitResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("commitShuffleTask")) { + try (ServerRpcAuditContext auditContext = createAuditContext("commitShuffleTask")) { String appId = req.getAppId(); int shuffleId = req.getShuffleId(); auditContext.withAppId(appId).withShuffleId(shuffleId); @@ -576,7 +577,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { @Override public void finishShuffle( FinishShuffleRequest req, StreamObserver<FinishShuffleResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("finishShuffle")) { + try (ServerRpcAuditContext auditContext = createAuditContext("finishShuffle")) { String appId = req.getAppId(); int shuffleId = req.getShuffleId(); auditContext.withAppId(appId).withShuffleId(shuffleId); @@ -624,7 +625,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { @Override public void requireBuffer( RequireBufferRequest request, StreamObserver<RequireBufferResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("requireBuffer")) { + try (ServerRpcAuditContext auditContext = createAuditContext("requireBuffer")) { String appId = request.getAppId(); auditContext.withAppId(appId).withShuffleId(request.getShuffleId()); String auditArgs = "requireSize=" + request.getRequireSize(); @@ -704,7 +705,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { @Override public void appHeartbeat( AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("appHeartbeat")) { + try (ServerRpcAuditContext auditContext = createAuditContext("appHeartbeat")) { String appId = request.getAppId(); auditContext.withAppId(appId); StatusCode status = verifyRequest(appId); @@ -745,7 +746,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { public void reportShuffleResult( ReportShuffleResultRequest request, StreamObserver<ReportShuffleResultResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("reportShuffleResult")) { + try (ServerRpcAuditContext auditContext = createAuditContext("reportShuffleResult")) { String appId = request.getAppId(); int shuffleId = request.getShuffleId(); long taskAttemptId = request.getTaskAttemptId(); @@ -825,7 +826,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { @Override public void getShuffleResult( GetShuffleResultRequest request, StreamObserver<GetShuffleResultResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("getShuffleResult")) { + try (ServerRpcAuditContext auditContext = createAuditContext("getShuffleResult")) { String appId = request.getAppId(); int shuffleId = request.getShuffleId(); int partitionId = request.getPartitionId(); @@ -892,7 +893,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { public void getShuffleResultForMultiPart( GetShuffleResultForMultiPartRequest request, StreamObserver<GetShuffleResultForMultiPartResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("getShuffleResultForMultiPart")) { + try (ServerRpcAuditContext auditContext = createAuditContext("getShuffleResultForMultiPart")) { String appId = request.getAppId(); int shuffleId = request.getShuffleId(); List<Integer> partitionsList = request.getPartitionsList(); @@ -962,7 +963,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { public void getLocalShuffleData( GetLocalShuffleDataRequest request, StreamObserver<GetLocalShuffleDataResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("getLocalShuffleData")) { + try (ServerRpcAuditContext auditContext = createAuditContext("getLocalShuffleData")) { String appId = request.getAppId(); int shuffleId = request.getShuffleId(); int partitionId = request.getPartitionId(); @@ -1108,7 +1109,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { public void getLocalShuffleIndex( GetLocalShuffleIndexRequest request, StreamObserver<GetLocalShuffleIndexResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("getLocalShuffleIndex")) { + try (ServerRpcAuditContext auditContext = createAuditContext("getLocalShuffleIndex")) { String appId = request.getAppId(); int shuffleId = request.getShuffleId(); int partitionId = request.getPartitionId(); @@ -1232,7 +1233,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { public void getMemoryShuffleData( GetMemoryShuffleDataRequest request, StreamObserver<GetMemoryShuffleDataResponse> responseObserver) { - try (ServerRPCAuditContext auditContext = createAuditContext("getMemoryShuffleData")) { + try (ServerRpcAuditContext auditContext = createAuditContext("getMemoryShuffleData")) { String appId = request.getAppId(); int shuffleId = request.getShuffleId(); int partitionId = request.getPartitionId(); @@ -1450,20 +1451,23 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { } /** - * Creates a {@link ServerRPCAuditContext} instance. + * Creates a {@link ServerRpcAuditContext} instance. * - * @param command the command to be logged by this {@link AuditContext} - * @return newly-created {@link ServerRPCAuditContext} instance + * @param command the command to be logged by this {@link RpcAuditContext} + * @return newly-created {@link ServerRpcAuditContext} instance */ - private ServerRPCAuditContext createAuditContext(String command) { + private ServerRpcAuditContext createAuditContext(String command) { // Audit log may be enabled during runtime Logger auditLogger = null; if (isRpcAuditLogEnabled && !rpcAuditExcludeOpList.contains(command)) { auditLogger = AUDIT_LOGGER; } - ServerRPCAuditContext auditContext = new ServerRPCAuditContext(auditLogger); + ServerRpcAuditContext auditContext = new ServerRpcAuditContext(auditLogger); if (auditLogger != null) { - auditContext.withCommand(command).withCreationTimeNs(System.nanoTime()); + auditContext + .withCommand(command) + .withFrom(ClientContextServerInterceptor.getIpAddress()) + .withCreationTimeNs(System.nanoTime()); } return auditContext; } diff --git a/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java b/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java deleted file mode 100644 index b78c606a5..000000000 --- a/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.server.audit; - -import org.slf4j.Logger; - -import org.apache.uniffle.common.audit.AuditContext; -import org.apache.uniffle.common.rpc.StatusCode; - -/** An audit context for shuffle server rpc. */ -public class ServerRPCAuditContext implements AuditContext { - private final Logger log; - private String command; - private String statusCode; - private long creationTimeNs; - private long executionTimeNs; - private String appId = "N/A"; - private int shuffleId = -1; - private String args; - private String returnValue; - - /** - * Constructor of {@link ServerRPCAuditContext}. - * - * @param log the logger to log the audit information - */ - public ServerRPCAuditContext(Logger log) { - this.log = log; - } - - /** - * Sets mCommand field. - * - * @param command the command associated with shuffle server rpc - * @return this {@link AuditContext} instance - */ - public ServerRPCAuditContext withCommand(String command) { - this.command = command; - return this; - } - - /** - * Sets creationTimeNs field. - * - * @param creationTimeNs the System.nanoTime() when this operation create, it only can be used to - * compute operation mExecutionTime - * @return this {@link AuditContext} instance - */ - public ServerRPCAuditContext withCreationTimeNs(long creationTimeNs) { - this.creationTimeNs = creationTimeNs; - return this; - } - - /** - * Sets status code field. - * - * @param statusCode the status code - * @return this {@link AuditContext} instance - */ - public ServerRPCAuditContext withStatusCode(StatusCode statusCode) { - this.statusCode = statusCode.name(); - return this; - } - - /** - * Sets status code field. - * - * @param statusCode the status code - * @return this {@link AuditContext} instance - */ - public ServerRPCAuditContext withStatusCode(String statusCode) { - this.statusCode = statusCode; - return this; - } - - @Override - public void close() { - if (log == null) { - return; - } - executionTimeNs = System.nanoTime() - creationTimeNs; - log.info(toString()); - } - - @Override - public String toString() { - String line = - String.format( - "cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d", - command, statusCode, appId, shuffleId, executionTimeNs / 1000); - if (args != null) { - line += String.format("\targs{%s}", args); - } - if (returnValue != null) { - line += String.format("\treturn{%s}", returnValue); - } - return line; - } - - public ServerRPCAuditContext withAppId(String appId) { - this.appId = appId; - return this; - } - - public ServerRPCAuditContext withShuffleId(int shuffleId) { - this.shuffleId = shuffleId; - return this; - } - - public ServerRPCAuditContext withArgs(String args) { - this.args = args; - return this; - } - - public ServerRPCAuditContext withReturnValue(String returnValue) { - this.returnValue = returnValue; - return this; - } -} diff --git a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java b/server/src/main/java/org/apache/uniffle/server/audit/ServerRpcAuditContext.java similarity index 50% rename from common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java rename to server/src/main/java/org/apache/uniffle/server/audit/ServerRpcAuditContext.java index 77fbb9374..00c98f518 100644 --- a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java +++ b/server/src/main/java/org/apache/uniffle/server/audit/ServerRpcAuditContext.java @@ -15,12 +15,38 @@ * limitations under the License. */ -package org.apache.uniffle.common.audit; +package org.apache.uniffle.server.audit; -import java.io.Closeable; +import org.slf4j.Logger; + +import org.apache.uniffle.common.audit.RpcAuditContext; + +/** An audit context for shuffle server rpc. */ +public class ServerRpcAuditContext extends RpcAuditContext { + private String appId = "N/A"; + private int shuffleId = -1; + + /** + * Constructor of {@link ServerRpcAuditContext}. + * + * @param log the logger to log the audit information + */ + public ServerRpcAuditContext(Logger log) { + super(log); + } -/** Context for audit logging. */ -public interface AuditContext extends Closeable { @Override - void close(); + protected String content() { + return String.format("appId=%s\tshuffleId=%s", appId, shuffleId); + } + + public ServerRpcAuditContext withAppId(String appId) { + this.appId = appId; + return this; + } + + public ServerRpcAuditContext withShuffleId(int shuffleId) { + this.shuffleId = shuffleId; + return this; + } } diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java index fdb723dfb..fb4d824d9 100644 --- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java @@ -40,7 +40,7 @@ import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShufflePartitionedData; -import org.apache.uniffle.common.audit.AuditContext; +import org.apache.uniffle.common.audit.RpcAuditContext; import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException; import org.apache.uniffle.common.exception.FileNotFoundException; @@ -65,7 +65,7 @@ import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.ShuffleServerMetrics; import org.apache.uniffle.server.ShuffleTaskInfo; import org.apache.uniffle.server.ShuffleTaskManager; -import org.apache.uniffle.server.audit.ServerRPCAuditContext; +import org.apache.uniffle.server.audit.ServerRpcAuditContext; import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo; import org.apache.uniffle.server.buffer.ShuffleBufferManager; import org.apache.uniffle.storage.common.Storage; @@ -121,7 +121,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { } public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) { - try (ServerRPCAuditContext auditContext = createAuditContext("sendShuffleData")) { + try (ServerRpcAuditContext auditContext = createAuditContext("sendShuffleData", client)) { RpcResponse rpcResponse; String appId = req.getAppId(); int shuffleId = req.getShuffleId(); @@ -386,7 +386,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { public void handleGetMemoryShuffleDataRequest( TransportClient client, GetMemoryShuffleDataRequest req) { - try (ServerRPCAuditContext auditContext = createAuditContext("getMemoryShuffleData")) { + try (ServerRpcAuditContext auditContext = createAuditContext("getMemoryShuffleData", client)) { String appId = req.getAppId(); int shuffleId = req.getShuffleId(); int partitionId = req.getPartitionId(); @@ -497,7 +497,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { public void handleGetLocalShuffleIndexRequest( TransportClient client, GetLocalShuffleIndexRequest req) { - try (ServerRPCAuditContext auditContext = createAuditContext("getLocalShuffleIndex")) { + try (ServerRpcAuditContext auditContext = createAuditContext("getLocalShuffleIndex", client)) { String appId = req.getAppId(); int shuffleId = req.getShuffleId(); int partitionId = req.getPartitionId(); @@ -607,7 +607,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { } public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDataRequest req) { - try (ServerRPCAuditContext auditContext = createAuditContext("getLocalShuffleData")) { + try (ServerRpcAuditContext auditContext = createAuditContext("getLocalShuffleData", client)) { GetLocalShuffleDataResponse response; String appId = req.getAppId(); int shuffleId = req.getShuffleId(); @@ -879,20 +879,24 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { } /** - * Creates a {@link ServerRPCAuditContext} instance. + * Creates a {@link ServerRpcAuditContext} instance. * - * @param command the command to be logged by this {@link AuditContext} - * @return newly-created {@link ServerRPCAuditContext} instance + * @param command the command to be logged by this {@link RpcAuditContext} + * @return newly-created {@link ServerRpcAuditContext} instance */ - private ServerRPCAuditContext createAuditContext(String command) { + private ServerRpcAuditContext createAuditContext( + String command, TransportClient transportClient) { // Audit log may be enabled during runtime Logger auditLogger = null; if (isRpcAuditLogEnabled && !rpcAuditExcludeOpList.contains(command)) { auditLogger = AUDIT_LOGGER; } - ServerRPCAuditContext auditContext = new ServerRPCAuditContext(auditLogger); + ServerRpcAuditContext auditContext = new ServerRpcAuditContext(auditLogger); if (auditLogger != null) { - auditContext.withCommand(command).withCreationTimeNs(System.nanoTime()); + auditContext + .withCommand(command) + .withFrom(transportClient.getSocketAddress().toString()) + .withCreationTimeNs(System.nanoTime()); } return auditContext; }