This is an automated email from the ASF dual-hosted git repository. rickyma pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new b4ed85429 [#1903] improvement(server): Add some metrics for shuffle server (#1904) b4ed85429 is described below commit b4ed85429c601216508d48b6e6c9b848f032b797 Author: kqhzz <kuangqinghu...@163.com> AuthorDate: Thu Jul 18 02:38:28 2024 +0800 [#1903] improvement(server): Add some metrics for shuffle server (#1904) ### What changes were proposed in this pull request? Add some metrics for the shuffle server. ### Why are the changes needed? This would be more helpful for us to understand the status of the shuffle server. Fix: #1903 ### Does this PR introduce _any_ user-facing change? No. --- .../common/storage/ApplicationStorageInfo.java | 56 ++++++++++++++++++++++ .../uniffle/server/ShuffleServerMetrics.java | 15 ++++++ .../server/storage/HadoopStorageManager.java | 1 + .../server/storage/LocalStorageManager.java | 1 + .../server/storage/SingleStorageManager.java | 27 +++++++++++ .../uniffle/storage/common/AbstractStorage.java | 5 ++ .../org/apache/uniffle/storage/common/Storage.java | 2 + 7 files changed, 107 insertions(+) diff --git a/common/src/main/java/org/apache/uniffle/common/storage/ApplicationStorageInfo.java b/common/src/main/java/org/apache/uniffle/common/storage/ApplicationStorageInfo.java new file mode 100644 index 000000000..06ddcdf1f --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/storage/ApplicationStorageInfo.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.storage; + +import java.util.concurrent.atomic.AtomicLong; + +public class ApplicationStorageInfo { + private String appId; + private AtomicLong fileNum; + private AtomicLong usedBytes; + + public ApplicationStorageInfo(String appId) { + this.appId = appId; + this.fileNum = new AtomicLong(); + this.usedBytes = new AtomicLong(); + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public long getFileNum() { + return fileNum.get(); + } + + public void incFileNum(long fileNum) { + this.fileNum.addAndGet(fileNum); + } + + public long getUsedBytes() { + return usedBytes.get(); + } + + public void incUsedBytes(long usedBytes) { + this.usedBytes.addAndGet(usedBytes); + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java index 8eada73c5..309cee553 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java @@ -39,6 +39,11 @@ public class ShuffleServerMetrics { private static final String TOTAL_RECEIVED_DATA = "total_received_data"; private static final String TOTAL_WRITE_DATA = "total_write_data"; + private static final String TOTAL_DELETE_DATA = "total_delete_data"; + private static final String TOTAL_FLUSH_FILE_NUM = "total_flush_file_num"; + private static final String TOTAL_DELETE_FILE_NUM = "total_delete_file_num"; + private static final String STORAGE_USED_BYTES = "storage_used_bytes"; + private static final String FLUSH_FILE_NUM = "flush_file_num"; private static final String TOTAL_WRITE_BLOCK = "total_write_block"; private static final String WRITE_BLOCK_SIZE = "write_block_size"; private static final String TOTAL_WRITE_TIME = "total_write_time"; @@ -155,6 +160,11 @@ public class ShuffleServerMetrics { public static Counter.Child counterTotalReceivedDataSize; public static Counter.Child counterTotalWriteDataSize; + public static Counter.Child counterTotalDeleteDataSize; + public static Counter.Child counterTotalFlushFileNum; + public static Counter.Child counterTotalDeleteFileNum; + public static Gauge.Child gaugeStorageUsedBytes; + public static Gauge.Child gaugeFlushFileNum; public static Counter.Child counterTotalWriteBlockSize; public static Histogram appHistogramWriteBlockSize; public static Counter.Child counterTotalWriteTime; @@ -337,6 +347,11 @@ public class ShuffleServerMetrics { private static void setUpMetrics(ShuffleServerConf serverConf) { counterTotalReceivedDataSize = metricsManager.addLabeledCounter(TOTAL_RECEIVED_DATA); counterTotalWriteDataSize = metricsManager.addLabeledCounter(TOTAL_WRITE_DATA); + counterTotalDeleteDataSize = metricsManager.addLabeledCounter(TOTAL_DELETE_DATA); + counterTotalFlushFileNum = metricsManager.addLabeledCounter(TOTAL_FLUSH_FILE_NUM); + counterTotalDeleteFileNum = metricsManager.addLabeledCounter(TOTAL_DELETE_FILE_NUM); + gaugeStorageUsedBytes = metricsManager.addLabeledGauge(STORAGE_USED_BYTES); + gaugeFlushFileNum = metricsManager.addLabeledGauge(FLUSH_FILE_NUM); counterTotalWriteBlockSize = metricsManager.addLabeledCounter(TOTAL_WRITE_BLOCK); appHistogramWriteBlockSize = metricsManager.addHistogram( diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java index 90d62a1ed..66c4349cf 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java @@ -148,6 +148,7 @@ public class HadoopStorageManager extends SingleStorageManager { } } deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); + removeAppStorageInfo(event); } else { LOG.warn("Storage gotten is null when removing resources for event: {}", event); } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index c55fe9060..488629544 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -326,6 +326,7 @@ public class LocalStorageManager extends SingleStorageManager { .collect(Collectors.toList()); deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); + removeAppStorageInfo(event); } private void cleanupStorageSelectionCache(PurgeEvent event) { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java index 6acead608..fb974d5f5 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java @@ -18,6 +18,7 @@ package org.apache.uniffle.server.storage; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; @@ -26,10 +27,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.storage.ApplicationStorageInfo; +import org.apache.uniffle.common.util.JavaUtils; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.server.ShuffleDataFlushEvent; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.ShuffleServerMetrics; +import org.apache.uniffle.server.event.PurgeEvent; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.common.StorageWriteMetrics; import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler; @@ -42,6 +46,8 @@ public abstract class SingleStorageManager implements StorageManager { private final long eventSizeThresholdL1; private final long eventSizeThresholdL2; private final long eventSizeThresholdL3; + protected final Map<String, ApplicationStorageInfo> appStorageInfoMap = + JavaUtils.newConcurrentMap(); public SingleStorageManager(ShuffleServerConf conf) { writeSlowThreshold = conf.getSizeAsBytes(ShuffleServerConf.SERVER_WRITE_SLOW_THRESHOLD); @@ -92,6 +98,16 @@ public abstract class SingleStorageManager implements StorageManager { } else { ShuffleServerMetrics.counterEventSizeThresholdLevel4.inc(); } + String appId = event.getAppId(); + ApplicationStorageInfo appStorage = + appStorageInfoMap.computeIfAbsent(appId, id -> new ApplicationStorageInfo(appId)); + appStorage.incUsedBytes(event.getSize()); + ShuffleServerMetrics.gaugeStorageUsedBytes.inc(event.getSize()); + if (event.getUnderStorage().containsWriteHandler(appId)) { + appStorage.incFileNum(1); + ShuffleServerMetrics.gaugeFlushFileNum.inc(); + ShuffleServerMetrics.counterTotalFlushFileNum.inc(); + } Storage storage = event.getUnderStorage(); if (storage != null) { storage.updateWriteMetrics(metrics); @@ -149,4 +165,15 @@ public abstract class SingleStorageManager implements StorageManager { public void stop() { // do nothing } + + public void removeAppStorageInfo(PurgeEvent event) { + String appId = event.getAppId(); + ApplicationStorageInfo info = appStorageInfoMap.remove(appId); + if (info != null) { + ShuffleServerMetrics.gaugeStorageUsedBytes.dec(info.getUsedBytes()); + ShuffleServerMetrics.gaugeFlushFileNum.dec(info.getFileNum()); + ShuffleServerMetrics.counterTotalDeleteDataSize.inc(info.getUsedBytes()); + ShuffleServerMetrics.counterTotalDeleteFileNum.inc(info.getFileNum()); + } + } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java index f8bf8c9b4..ef2b14b80 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java +++ b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java @@ -68,6 +68,11 @@ public abstract class AbstractStorage implements Storage { protected abstract ServerReadHandler newReadHandler(CreateShuffleReadHandlerRequest request); + @Override + public boolean containsWriteHandler(String appId) { + return writerHandlers.containsKey(appId); + } + public boolean containsWriteHandler(String appId, int shuffleId, int partition) { Map<String, ShuffleWriteHandler> map = writerHandlers.get(appId); if (map == null || map.isEmpty()) { diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java index 43168c324..80f36a7bd 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java +++ b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java @@ -35,6 +35,8 @@ public interface Storage { ShuffleWriteHandler getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest request) throws IOException; + boolean containsWriteHandler(String appId); + ServerReadHandler getOrCreateReadHandler(CreateShuffleReadHandlerRequest request); void removeHandlers(String appId);