This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 704c70c3c [#1873] feat(server): Add audit log support for write and 
delete operations (#1874)
704c70c3c is described below

commit 704c70c3ca9e33e195708e3a93067438ea12aa9a
Author: RickyMa <rick...@tencent.com>
AuthorDate: Tue Jul 9 14:16:03 2024 +0800

    [#1873] feat(server): Add audit log support for write and delete operations 
(#1874)
    
    ### What changes were proposed in this pull request?
    
    Add audit log support for write and delete operations.
    Introduce a switch `rss.server.audit.log.enabled`, the default value is 
`false`.
    
    ### Why are the changes needed?
    
    For https://github.com/apache/incubator-uniffle/issues/1873.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    For write operations, the audit.log will be:
    ```
    
w|application_1716779728283_7775663_1720409662460|1|858_858|local|/data7/rssdata|56401402|2024-07-08
 11:36:32|2024-07-08 11:36:32|85
    
w|application_1716779728283_7775663_1720409662460|1|860_860|local|/data13/rssdata|56365595|2024-07-08
 11:36:32|2024-07-08 11:36:32|76
    
w|application_1703049085550_15349071_1720381124695|0|520_520|qy-xxx-30-v3|hdfs://qy-xxx-30-v3/rss/online|203300442|2024-07-08
 03:41:11|2024-07-08 03:41:14|2543
    ```
    
    For delete operations, the audit.log will be:
    ```
    d|application_1716779728283_7775663_1720409662460|local|/data1/rssdata
    d|application_1716779728283_7775663_1720409662460|local|/data2/rssdata
    
d|application_1703049085550_15349071_1720381124695|qy-xxx-30-v3|hdfs://qy-xxx-30-v3/rss/online
    ```
---
 bin/start-shuffle-server.sh                        |  3 +-
 .../java/org/apache/uniffle/common/AuditType.java  | 32 ++++++++++++++++++++++
 conf/log4j2.xml                                    | 10 +++++++
 .../apache/uniffle/server/ShuffleFlushManager.java | 24 ++++++++++++++++
 .../apache/uniffle/server/ShuffleServerConf.java   | 11 ++++++++
 .../server/storage/HadoopStorageManager.java       | 25 +++++++++++++++++
 .../server/storage/LocalStorageManager.java        | 23 ++++++++++++++++
 7 files changed, 127 insertions(+), 1 deletion(-)

diff --git a/bin/start-shuffle-server.sh b/bin/start-shuffle-server.sh
index 1b1afe524..d6186b386 100755
--- a/bin/start-shuffle-server.sh
+++ b/bin/start-shuffle-server.sh
@@ -30,6 +30,7 @@ SHUFFLE_SERVER_CONF_FILE="${RSS_CONF_DIR}/server.conf"
 JAR_DIR="${RSS_HOME}/jars"
 LOG_CONF_FILE="${RSS_CONF_DIR}/log4j2.xml"
 LOG_PATH="${RSS_LOG_DIR}/shuffle_server.log"
+AUDIT_LOG_PATH="${RSS_LOG_DIR}/audit.log"
 
 if [ -z "${XMX_SIZE:-}" ]; then
   echo "No env XMX_SIZE."
@@ -131,7 +132,7 @@ GC_LOG_ARGS_NEW=" -XX:+IgnoreUnrecognizedVMOptions \
 ARGS=""
 
 if [ -f ${LOG_CONF_FILE} ]; then
-  ARGS="$ARGS -Dlog4j2.configurationFile=file:${LOG_CONF_FILE} 
-Dlog.path=${LOG_PATH}"
+  ARGS="$ARGS -Dlog4j2.configurationFile=file:${LOG_CONF_FILE} 
-Dlog.path=${LOG_PATH} -Daudit.log.path=${AUDIT_LOG_PATH}"
 else
   echo "Exit with error: ${LOG_CONF_FILE} file doesn't exist."
   exit 1
diff --git a/common/src/main/java/org/apache/uniffle/common/AuditType.java 
b/common/src/main/java/org/apache/uniffle/common/AuditType.java
new file mode 100644
index 000000000..7ed9bda3e
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/AuditType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum AuditType {
+  WRITE("w"),
+  DELETE("d");
+  private String value;
+
+  AuditType(String value) {
+    this.value = value;
+  }
+
+  public String getValue() {
+    return value;
+  }
+}
diff --git a/conf/log4j2.xml b/conf/log4j2.xml
index 6ea652a08..b05d669ac 100644
--- a/conf/log4j2.xml
+++ b/conf/log4j2.xml
@@ -27,6 +27,13 @@
       </Policies>
       <DefaultRolloverStrategy max="10"/>
     </RollingFile>
+    <RollingFile name="auditAppender" fileName="${sys:audit.log.path}" 
filePattern="${sys:audit.log.path}.%i">
+      <PatternLayout pattern="%m%n"/>
+      <Policies>
+        <SizeBasedTriggeringPolicy size="2GB"/>
+      </Policies>
+      <DefaultRolloverStrategy max="10"/>
+    </RollingFile>
   </Appenders>
   <Loggers>
     <Root level="info">
@@ -45,5 +52,8 @@
       <AppenderRef ref="console"/>
       <AppenderRef ref="RollingAppender"/>
     </Logger>
+    <Logger name="audit" level="INFO" additivity="false">
+      <AppenderRef ref="auditAppender"/>
+    </Logger>
   </Loggers>
 </Configuration>
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 9237884fd..92a3da081 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -26,11 +26,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.AuditType;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
@@ -50,6 +52,8 @@ import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY
 public class ShuffleFlushManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleFlushManager.class);
+  private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
+  private static final String AUDIT_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss";
   public static final AtomicLong ATOMIC_EVENT_ID = new AtomicLong(0);
   private final ShuffleServer shuffleServer;
   private final List<String> storageBasePaths;
@@ -65,6 +69,7 @@ public class ShuffleFlushManager {
   private final StorageManager storageManager;
   private final long pendingEventTimeoutSec;
   private FlushEventHandler eventHandler;
+  private final boolean isAuditLogEnabled;
 
   public ShuffleFlushManager(
       ShuffleServerConf shuffleServerConf,
@@ -83,6 +88,8 @@ public class ShuffleFlushManager {
     eventHandler =
         new DefaultFlushEventHandler(
             shuffleServerConf, storageManager, shuffleServer, 
this::processFlushEvent);
+    isAuditLogEnabled =
+        
this.shuffleServerConf.getBoolean(ShuffleServerConf.SERVER_AUDIT_LOG_ENABLED);
   }
 
   public void addToFlushQueue(ShuffleDataFlushEvent event) {
@@ -162,15 +169,32 @@ public class ShuffleFlushManager {
               user,
               maxConcurrencyPerPartitionToWrite);
       ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+      long startTime = System.currentTimeMillis();
       boolean writeSuccess = storageManager.write(storage, handler, event);
       if (!writeSuccess) {
         throw new EventRetryException();
       }
+      long endTime = System.currentTimeMillis();
 
       // update some metrics for shuffle task
       updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
event.getShuffleBlocks());
       ShuffleTaskInfo shuffleTaskInfo =
           
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
+      if (isAuditLogEnabled) {
+        AUDIT_LOGGER.info(
+            String.format(
+                "%s|%s|%d|%s|%s|%s|%d|%s|%s|%d",
+                AuditType.WRITE.getValue(),
+                event.getAppId(),
+                event.getShuffleId(),
+                event.getStartPartition() + "_" + event.getEndPartition(),
+                event.getUnderStorage().getStorageHost(),
+                event.getUnderStorage().getStoragePath(),
+                event.getSize(),
+                DateFormatUtils.format(startTime, AUDIT_DATE_PATTERN),
+                DateFormatUtils.format(endTime, AUDIT_DATE_PATTERN),
+                endTime - startTime));
+      }
       if (null != shuffleTaskInfo) {
         String storageHost = event.getUnderStorage().getStorageHost();
         if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index ebc4aaccb..55a43928d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -158,6 +158,17 @@ public class ShuffleServerConf extends RssBaseConf {
           .defaultValue(10000L)
           .withDescription("Threshold for write slow defined");
 
+  public static final ConfigOption<Boolean> SERVER_AUDIT_LOG_ENABLED =
+      ConfigOptions.key("rss.server.audit.log.enabled")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription(
+              "When set to true, for auditing purposes, the server will log 
audit records for every disk write and delete operation. "
+                  + "Each file write is logged, while delete operations are 
specific to application ID/shuffle ID, "
+                  + "removing all associated files and recording the deletion 
of the entire application ID or shuffle ID. "
+                  + "For a write operation, it includes the size of the data 
written, the storage type and the specific disk to which it is written "
+                  + "(for instance, in scenarios where multiple local disks 
are mounted).");
+
   public static final ConfigOption<Long> SERVER_EVENT_SIZE_THRESHOLD_L1 =
       ConfigOptions.key("rss.server.event.size.threshold.l1")
           .longType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 7ab9ef639..90d62a1ed 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.AuditType;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
@@ -55,16 +56,19 @@ import org.apache.uniffle.storage.util.StorageType;
 public class HadoopStorageManager extends SingleStorageManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStorageManager.class);
+  private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
 
   private final Configuration hadoopConf;
   private final String shuffleServerId;
   private Map<String, HadoopStorage> appIdToStorages = 
JavaUtils.newConcurrentMap();
   private Map<String, HadoopStorage> pathToStorages = 
JavaUtils.newConcurrentMap();
+  private final boolean isAuditLogEnabled;
 
   HadoopStorageManager(ShuffleServerConf conf) {
     super(conf);
     hadoopConf = conf.getHadoopConf();
     shuffleServerId = conf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID, 
"shuffleServerId");
+    isAuditLogEnabled = 
conf.getBoolean(ShuffleServerConf.SERVER_AUDIT_LOG_ENABLED);
   }
 
   @Override
@@ -116,11 +120,32 @@ public class HadoopStorageManager extends 
SingleStorageManager {
 
       if (event instanceof AppPurgeEvent) {
         deletePaths.add(basicPath);
+        if (isAuditLogEnabled) {
+          AUDIT_LOGGER.info(
+              String.format(
+                  "%s|%s|%s|%s",
+                  AuditType.DELETE.getValue(),
+                  appId,
+                  storage.getStorageHost(),
+                  storage.getStoragePath()));
+        }
       } else {
         for (Integer shuffleId : event.getShuffleIds()) {
           deletePaths.add(
               ShuffleStorageUtils.getFullShuffleDataFolder(basicPath, 
String.valueOf(shuffleId)));
         }
+        if (isAuditLogEnabled) {
+          AUDIT_LOGGER.info(
+              String.format(
+                  "%s|%s|%s|%s|%s",
+                  AuditType.DELETE.getValue(),
+                  appId,
+                  event.getShuffleIds().stream()
+                      .map(Object::toString)
+                      .collect(Collectors.joining(",")),
+                  storage.getStorageHost(),
+                  storage.getStoragePath()));
+        }
       }
       deleteHandler.delete(deletePaths.toArray(new String[0]), appId, 
event.getUser());
     } else {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 724201231..c55fe9060 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.AuditType;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.UnionKey;
 import org.apache.uniffle.common.exception.RssException;
@@ -77,6 +78,7 @@ import static 
org.apache.uniffle.server.ShuffleServerConf.LOCAL_STORAGE_INITIALI
 
 public class LocalStorageManager extends SingleStorageManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalStorageManager.class);
+  private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
   private static final String UNKNOWN_USER_NAME = "unknown";
 
   private final List<LocalStorage> localStorages;
@@ -86,6 +88,8 @@ public class LocalStorageManager extends SingleStorageManager 
{
   private final ConcurrentSkipListMap<String, LocalStorage> 
sortedPartitionsOfStorageMap;
   private final List<StorageMediaProvider> typeProviders = 
Lists.newArrayList();
 
+  private final boolean isAuditLogEnabled;
+
   @VisibleForTesting
   LocalStorageManager(ShuffleServerConf conf) {
     super(conf);
@@ -169,6 +173,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
         StringUtils.join(
             
localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList())));
     this.checker = new LocalStorageChecker(conf, localStorages);
+    isAuditLogEnabled = 
conf.getBoolean(ShuffleServerConf.SERVER_AUDIT_LOG_ENABLED);
   }
 
   private StorageMedia getStorageTypeForBasePath(String basePath) {
@@ -295,8 +300,26 @@ public class LocalStorageManager extends 
SingleStorageManager {
                           ShuffleStorageUtils.getFullShuffleDataFolder(
                               basicPath, String.valueOf(shuffleId)));
                     }
+                    if (isAuditLogEnabled) {
+                      AUDIT_LOGGER.info(
+                          String.format(
+                              "%s|%s|%s|%s|%s",
+                              AuditType.DELETE.getValue(),
+                              appId,
+                              shuffleSet.stream()
+                                  .map(Object::toString)
+                                  .collect(Collectors.joining(",")),
+                              LocalStorage.STORAGE_HOST,
+                              path));
+                    }
                     return paths.stream();
                   } else {
+                    if (isAuditLogEnabled) {
+                      AUDIT_LOGGER.info(
+                          String.format(
+                              "%s|%s|%s|%s",
+                              AuditType.DELETE.getValue(), appId, 
LocalStorage.STORAGE_HOST, path));
+                    }
                     return Stream.of(basicPath);
                   }
                 })

Reply via email to