This is an automated email from the ASF dual-hosted git repository. rickyma pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 704c70c3c [#1873] feat(server): Add audit log support for write and delete operations (#1874) 704c70c3c is described below commit 704c70c3ca9e33e195708e3a93067438ea12aa9a Author: RickyMa <rick...@tencent.com> AuthorDate: Tue Jul 9 14:16:03 2024 +0800 [#1873] feat(server): Add audit log support for write and delete operations (#1874) ### What changes were proposed in this pull request? Add audit log support for write and delete operations. Introduce a switch `rss.server.audit.log.enabled`, the default value is `false`. ### Why are the changes needed? For https://github.com/apache/incubator-uniffle/issues/1873. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. For write operations, the audit.log will be: ``` w|application_1716779728283_7775663_1720409662460|1|858_858|local|/data7/rssdata|56401402|2024-07-08 11:36:32|2024-07-08 11:36:32|85 w|application_1716779728283_7775663_1720409662460|1|860_860|local|/data13/rssdata|56365595|2024-07-08 11:36:32|2024-07-08 11:36:32|76 w|application_1703049085550_15349071_1720381124695|0|520_520|qy-xxx-30-v3|hdfs://qy-xxx-30-v3/rss/online|203300442|2024-07-08 03:41:11|2024-07-08 03:41:14|2543 ``` For delete operations, the audit.log will be: ``` d|application_1716779728283_7775663_1720409662460|local|/data1/rssdata d|application_1716779728283_7775663_1720409662460|local|/data2/rssdata d|application_1703049085550_15349071_1720381124695|qy-xxx-30-v3|hdfs://qy-xxx-30-v3/rss/online ``` --- bin/start-shuffle-server.sh | 3 +- .../java/org/apache/uniffle/common/AuditType.java | 32 ++++++++++++++++++++++ conf/log4j2.xml | 10 +++++++ .../apache/uniffle/server/ShuffleFlushManager.java | 24 ++++++++++++++++ .../apache/uniffle/server/ShuffleServerConf.java | 11 ++++++++ .../server/storage/HadoopStorageManager.java | 25 +++++++++++++++++ .../server/storage/LocalStorageManager.java | 23 ++++++++++++++++ 7 files changed, 127 insertions(+), 1 deletion(-) diff --git a/bin/start-shuffle-server.sh b/bin/start-shuffle-server.sh index 1b1afe524..d6186b386 100755 --- a/bin/start-shuffle-server.sh +++ b/bin/start-shuffle-server.sh @@ -30,6 +30,7 @@ SHUFFLE_SERVER_CONF_FILE="${RSS_CONF_DIR}/server.conf" JAR_DIR="${RSS_HOME}/jars" LOG_CONF_FILE="${RSS_CONF_DIR}/log4j2.xml" LOG_PATH="${RSS_LOG_DIR}/shuffle_server.log" +AUDIT_LOG_PATH="${RSS_LOG_DIR}/audit.log" if [ -z "${XMX_SIZE:-}" ]; then echo "No env XMX_SIZE." @@ -131,7 +132,7 @@ GC_LOG_ARGS_NEW=" -XX:+IgnoreUnrecognizedVMOptions \ ARGS="" if [ -f ${LOG_CONF_FILE} ]; then - ARGS="$ARGS -Dlog4j2.configurationFile=file:${LOG_CONF_FILE} -Dlog.path=${LOG_PATH}" + ARGS="$ARGS -Dlog4j2.configurationFile=file:${LOG_CONF_FILE} -Dlog.path=${LOG_PATH} -Daudit.log.path=${AUDIT_LOG_PATH}" else echo "Exit with error: ${LOG_CONF_FILE} file doesn't exist." exit 1 diff --git a/common/src/main/java/org/apache/uniffle/common/AuditType.java b/common/src/main/java/org/apache/uniffle/common/AuditType.java new file mode 100644 index 000000000..7ed9bda3e --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/AuditType.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common; + +public enum AuditType { + WRITE("w"), + DELETE("d"); + private String value; + + AuditType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/conf/log4j2.xml b/conf/log4j2.xml index 6ea652a08..b05d669ac 100644 --- a/conf/log4j2.xml +++ b/conf/log4j2.xml @@ -27,6 +27,13 @@ </Policies> <DefaultRolloverStrategy max="10"/> </RollingFile> + <RollingFile name="auditAppender" fileName="${sys:audit.log.path}" filePattern="${sys:audit.log.path}.%i"> + <PatternLayout pattern="%m%n"/> + <Policies> + <SizeBasedTriggeringPolicy size="2GB"/> + </Policies> + <DefaultRolloverStrategy max="10"/> + </RollingFile> </Appenders> <Loggers> <Root level="info"> @@ -45,5 +52,8 @@ <AppenderRef ref="console"/> <AppenderRef ref="RollingAppender"/> </Logger> + <Logger name="audit" level="INFO" additivity="false"> + <AppenderRef ref="auditAppender"/> + </Logger> </Loggers> </Configuration> \ No newline at end of file diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 9237884fd..92a3da081 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -26,11 +26,13 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.hadoop.conf.Configuration; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.AuditType; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.config.RssBaseConf; @@ -50,6 +52,8 @@ import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY public class ShuffleFlushManager { private static final Logger LOG = LoggerFactory.getLogger(ShuffleFlushManager.class); + private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit"); + private static final String AUDIT_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss"; public static final AtomicLong ATOMIC_EVENT_ID = new AtomicLong(0); private final ShuffleServer shuffleServer; private final List<String> storageBasePaths; @@ -65,6 +69,7 @@ public class ShuffleFlushManager { private final StorageManager storageManager; private final long pendingEventTimeoutSec; private FlushEventHandler eventHandler; + private final boolean isAuditLogEnabled; public ShuffleFlushManager( ShuffleServerConf shuffleServerConf, @@ -83,6 +88,8 @@ public class ShuffleFlushManager { eventHandler = new DefaultFlushEventHandler( shuffleServerConf, storageManager, shuffleServer, this::processFlushEvent); + isAuditLogEnabled = + this.shuffleServerConf.getBoolean(ShuffleServerConf.SERVER_AUDIT_LOG_ENABLED); } public void addToFlushQueue(ShuffleDataFlushEvent event) { @@ -162,15 +169,32 @@ public class ShuffleFlushManager { user, maxConcurrencyPerPartitionToWrite); ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request); + long startTime = System.currentTimeMillis(); boolean writeSuccess = storageManager.write(storage, handler, event); if (!writeSuccess) { throw new EventRetryException(); } + long endTime = System.currentTimeMillis(); // update some metrics for shuffle task updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks()); ShuffleTaskInfo shuffleTaskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId()); + if (isAuditLogEnabled) { + AUDIT_LOGGER.info( + String.format( + "%s|%s|%d|%s|%s|%s|%d|%s|%s|%d", + AuditType.WRITE.getValue(), + event.getAppId(), + event.getShuffleId(), + event.getStartPartition() + "_" + event.getEndPartition(), + event.getUnderStorage().getStorageHost(), + event.getUnderStorage().getStoragePath(), + event.getSize(), + DateFormatUtils.format(startTime, AUDIT_DATE_PATTERN), + DateFormatUtils.format(endTime, AUDIT_DATE_PATTERN), + endTime - startTime)); + } if (null != shuffleTaskInfo) { String storageHost = event.getUnderStorage().getStorageHost(); if (LocalStorage.STORAGE_HOST.equals(storageHost)) { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index ebc4aaccb..55a43928d 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -158,6 +158,17 @@ public class ShuffleServerConf extends RssBaseConf { .defaultValue(10000L) .withDescription("Threshold for write slow defined"); + public static final ConfigOption<Boolean> SERVER_AUDIT_LOG_ENABLED = + ConfigOptions.key("rss.server.audit.log.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "When set to true, for auditing purposes, the server will log audit records for every disk write and delete operation. " + + "Each file write is logged, while delete operations are specific to application ID/shuffle ID, " + + "removing all associated files and recording the deletion of the entire application ID or shuffle ID. " + + "For a write operation, it includes the size of the data written, the storage type and the specific disk to which it is written " + + "(for instance, in scenarios where multiple local disks are mounted)."); + public static final ConfigOption<Long> SERVER_EVENT_SIZE_THRESHOLD_L1 = ConfigOptions.key("rss.server.event.size.threshold.l1") .longType() diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java index 7ab9ef639..90d62a1ed 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.AuditType; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider; @@ -55,16 +56,19 @@ import org.apache.uniffle.storage.util.StorageType; public class HadoopStorageManager extends SingleStorageManager { private static final Logger LOG = LoggerFactory.getLogger(HadoopStorageManager.class); + private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit"); private final Configuration hadoopConf; private final String shuffleServerId; private Map<String, HadoopStorage> appIdToStorages = JavaUtils.newConcurrentMap(); private Map<String, HadoopStorage> pathToStorages = JavaUtils.newConcurrentMap(); + private final boolean isAuditLogEnabled; HadoopStorageManager(ShuffleServerConf conf) { super(conf); hadoopConf = conf.getHadoopConf(); shuffleServerId = conf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID, "shuffleServerId"); + isAuditLogEnabled = conf.getBoolean(ShuffleServerConf.SERVER_AUDIT_LOG_ENABLED); } @Override @@ -116,11 +120,32 @@ public class HadoopStorageManager extends SingleStorageManager { if (event instanceof AppPurgeEvent) { deletePaths.add(basicPath); + if (isAuditLogEnabled) { + AUDIT_LOGGER.info( + String.format( + "%s|%s|%s|%s", + AuditType.DELETE.getValue(), + appId, + storage.getStorageHost(), + storage.getStoragePath())); + } } else { for (Integer shuffleId : event.getShuffleIds()) { deletePaths.add( ShuffleStorageUtils.getFullShuffleDataFolder(basicPath, String.valueOf(shuffleId))); } + if (isAuditLogEnabled) { + AUDIT_LOGGER.info( + String.format( + "%s|%s|%s|%s|%s", + AuditType.DELETE.getValue(), + appId, + event.getShuffleIds().stream() + .map(Object::toString) + .collect(Collectors.joining(",")), + storage.getStorageHost(), + storage.getStoragePath())); + } } deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); } else { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 724201231..c55fe9060 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.AuditType; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.UnionKey; import org.apache.uniffle.common.exception.RssException; @@ -77,6 +78,7 @@ import static org.apache.uniffle.server.ShuffleServerConf.LOCAL_STORAGE_INITIALI public class LocalStorageManager extends SingleStorageManager { private static final Logger LOG = LoggerFactory.getLogger(LocalStorageManager.class); + private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit"); private static final String UNKNOWN_USER_NAME = "unknown"; private final List<LocalStorage> localStorages; @@ -86,6 +88,8 @@ public class LocalStorageManager extends SingleStorageManager { private final ConcurrentSkipListMap<String, LocalStorage> sortedPartitionsOfStorageMap; private final List<StorageMediaProvider> typeProviders = Lists.newArrayList(); + private final boolean isAuditLogEnabled; + @VisibleForTesting LocalStorageManager(ShuffleServerConf conf) { super(conf); @@ -169,6 +173,7 @@ public class LocalStorageManager extends SingleStorageManager { StringUtils.join( localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList()))); this.checker = new LocalStorageChecker(conf, localStorages); + isAuditLogEnabled = conf.getBoolean(ShuffleServerConf.SERVER_AUDIT_LOG_ENABLED); } private StorageMedia getStorageTypeForBasePath(String basePath) { @@ -295,8 +300,26 @@ public class LocalStorageManager extends SingleStorageManager { ShuffleStorageUtils.getFullShuffleDataFolder( basicPath, String.valueOf(shuffleId))); } + if (isAuditLogEnabled) { + AUDIT_LOGGER.info( + String.format( + "%s|%s|%s|%s|%s", + AuditType.DELETE.getValue(), + appId, + shuffleSet.stream() + .map(Object::toString) + .collect(Collectors.joining(",")), + LocalStorage.STORAGE_HOST, + path)); + } return paths.stream(); } else { + if (isAuditLogEnabled) { + AUDIT_LOGGER.info( + String.format( + "%s|%s|%s|%s", + AuditType.DELETE.getValue(), appId, LocalStorage.STORAGE_HOST, path)); + } return Stream.of(basicPath); } })