This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f13dfb74 [CELEBORN-113][FEATURE] Add metrics to monitor non-critical
error number on local device (#1100)
f13dfb74 is described below
commit f13dfb7421c6fe1ba4a8edd7ab08c70c73f5c441
Author: nafiy <[email protected]>
AuthorDate: Tue Dec 20 22:30:55 2022 +0800
[CELEBORN-113][FEATURE] Add metrics to monitor non-critical error number on
local device (#1100)
---
.../apache/celeborn/common/meta/DiskStatus.java | 13 +++++++++
.../celeborn/common/meta/DiskStatusSuiteJ.java} | 24 ++++++++---------
.../deploy/worker/storage/DeviceMonitor.scala | 20 +++++++++-----
.../deploy/worker/storage/StorageManager.scala | 2 +-
.../deploy/worker/storage/DeviceMonitorSuite.scala | 31 +++++++++++++++++++++-
5 files changed, 69 insertions(+), 21 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
b/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
index c7b6aff9..edf31327 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
@@ -34,4 +34,17 @@ public enum DiskStatus {
public final byte getValue() {
return value;
}
+
+ public final String toMetric() {
+ String[] fragments = this.name().split("_");
+ String metric = "";
+ for (String fragment : fragments) {
+ int len = fragment.length();
+ if (len >= 1) {
+ metric += fragment.substring(0, 1).toUpperCase();
+ metric += fragment.substring(1, len).toLowerCase();
+ }
+ }
+ return metric;
+ }
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
b/common/src/test/java/org/apache/celeborn/common/meta/DiskStatusSuiteJ.java
similarity index 60%
copy from common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
copy to
common/src/test/java/org/apache/celeborn/common/meta/DiskStatusSuiteJ.java
index c7b6aff9..d080ed3e 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
+++ b/common/src/test/java/org/apache/celeborn/common/meta/DiskStatusSuiteJ.java
@@ -17,21 +17,19 @@
package org.apache.celeborn.common.meta;
-public enum DiskStatus {
- HEALTHY(0),
- READ_OR_WRITE_FAILURE(1),
- IO_HANG(2),
- HIGH_DISK_USAGE(3),
- CRITICAL_ERROR(4);
+import static org.junit.Assert.*;
- private final byte value;
+import org.junit.Test;
- DiskStatus(int value) {
- assert (value >= 0 && value < 256);
- this.value = (byte) value;
- }
+public class DiskStatusSuiteJ {
- public final byte getValue() {
- return value;
+ @Test
+ public void testDiskStatusToMetric() throws Exception {
+ assertEquals(DiskStatus.values().length, 5);
+ assertEquals(DiskStatus.HEALTHY.toMetric(), "Healthy");
+ assertEquals(DiskStatus.READ_OR_WRITE_FAILURE.toMetric(),
"ReadOrWriteFailure");
+ assertEquals(DiskStatus.IO_HANG.toMetric(), "IoHang");
+ assertEquals(DiskStatus.HIGH_DISK_USAGE.toMetric(), "HighDiskUsage");
+ assertEquals(DiskStatus.CRITICAL_ERROR.toMetric(), "CriticalError");
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index ebc8ad6a..ef5ad5e6 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
+import org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.util.{ThreadUtils, Utils}
import org.apache.celeborn.common.util.Utils._
@@ -51,10 +52,11 @@ class LocalDeviceMonitor(
conf: CelebornConf,
observer: DeviceObserver,
deviceInfos: util.Map[String, DeviceInfo],
- diskInfos: util.Map[String, DiskInfo]) extends DeviceMonitor {
+ diskInfos: util.Map[String, DiskInfo],
+ workerSource: AbstractSource) extends DeviceMonitor {
val logger = LoggerFactory.getLogger(classOf[LocalDeviceMonitor])
- class ObservedDevice(val deviceInfo: DeviceInfo) {
+ class ObservedDevice(val deviceInfo: DeviceInfo, workerSource:
AbstractSource) {
val diskInfos = new ConcurrentHashMap[String, DiskInfo]()
deviceInfo.diskInfos.foreach { case diskInfo =>
diskInfos.put(diskInfo.mountPoint, diskInfo)
@@ -101,7 +103,11 @@ class LocalDeviceMonitor(
this.synchronized {
val nonCriticalErrorSetFunc = new util.function.Function[DiskStatus,
util.Set[Long]] {
override def apply(t: DiskStatus): util.Set[Long] = {
- ConcurrentHashMap.newKeySet[Long]()
+ val set = ConcurrentHashMap.newKeySet[Long]()
+ workerSource.addGauge(
+ s"Device_${deviceInfo.name}_${diskStatus.toMetric}_Count",
+ _ => set.size())
+ set
}
}
nonCriticalErrors.computeIfAbsent(diskStatus,
nonCriticalErrorSetFunc).add(
@@ -221,7 +227,7 @@ class LocalDeviceMonitor(
s"because noDevice device $deviceName exists.")
}
deviceInfos.asScala.foreach(entry => {
- val observedDevice = new ObservedDevice(entry._2)
+ val observedDevice = new ObservedDevice(entry._2, workerSource)
observedDevice.addObserver(observer)
observedDevices.put(entry._2, observedDevice)
})
@@ -331,10 +337,12 @@ object DeviceMonitor {
conf: CelebornConf,
deviceObserver: DeviceObserver,
deviceInfos: util.Map[String, DeviceInfo],
- diskInfos: util.Map[String, DiskInfo]): DeviceMonitor = {
+ diskInfos: util.Map[String, DiskInfo],
+ workerSource: AbstractSource): DeviceMonitor = {
try {
if (conf.diskMonitorEnabled) {
- val monitor = new LocalDeviceMonitor(conf, deviceObserver,
deviceInfos, diskInfos)
+ val monitor =
+ new LocalDeviceMonitor(conf, deviceObserver, deviceInfos, diskInfos,
workerSource)
monitor.init()
logger.info("Device monitor init success")
monitor
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 49307519..5d75e03e 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -92,7 +92,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
tmpDiskInfos.put(diskInfo.mountPoint, diskInfo)
}
private val deviceMonitor =
- DeviceMonitor.createDeviceMonitor(conf, this, deviceInfos, tmpDiskInfos)
+ DeviceMonitor.createDeviceMonitor(conf, this, deviceInfos, tmpDiskInfos,
workerSource)
// (mountPoint -> LocalFlusher)
private val localFlushers: ConcurrentHashMap[String, LocalFlusher] = {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
index ca76c7bf..ccf6ca77 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
@@ -34,6 +34,7 @@ import
org.apache.celeborn.common.CelebornConf.WORKER_DISK_MONITOR_CHECK_INTERVA
import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.service.deploy.worker.WorkerSource
class DeviceMonitorSuite extends AnyFunSuite {
val dfCmd = "df -ah"
@@ -66,6 +67,7 @@ class DeviceMonitorSuite extends AnyFunSuite {
val conf = new CelebornConf()
conf.set(WORKER_DISK_MONITOR_CHECK_INTERVAL.key, "3600s")
+ val workerSource = new WorkerSource(conf)
val storageManager = mock[DeviceObserver]
var (deviceInfos, diskInfos, workingDirDiskInfos): (
@@ -82,7 +84,7 @@ class DeviceMonitorSuite extends AnyFunSuite {
diskInfos = tdiskInfos
}
val deviceMonitor =
- new LocalDeviceMonitor(conf, storageManager, deviceInfos, diskInfos)
+ new LocalDeviceMonitor(conf, storageManager, deviceInfos, diskInfos,
workerSource)
val vdaDeviceInfo = new DeviceInfo("vda")
val vdbDeviceInfo = new DeviceInfo("vdb")
@@ -273,4 +275,31 @@ class DeviceMonitorSuite extends AnyFunSuite {
})
DeviceMonitor.deviceCheckThreadPool.shutdownNow()
}
+
+ test("monitor non-critical error metrics") {
+ withObjectMocked[org.apache.celeborn.common.util.Utils.type] {
+ when(Utils.runCommand(dfCmd)) thenReturn dfOut
+ when(Utils.runCommand(lsCmd)) thenReturn lsOut
+
+ deviceMonitor.init()
+
+ val device1 = deviceMonitor.observedDevices.values().asScala.head
+ val mountPoints1 = device1.diskInfos.keySet().asScala.toList
+
+ device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.READ_OR_WRITE_FAILURE)
+ device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.IO_HANG)
+ val deviceMonitorMetrics =
+
workerSource.gauges().filter(_.name.startsWith("Device")).sortBy(_.name)
+
+ assertEquals("Device_vda_IoHang_Count", deviceMonitorMetrics.head.name)
+ assertEquals("Device_vda_ReadOrWriteFailure_Count",
deviceMonitorMetrics.last.name)
+ assertEquals(1, deviceMonitorMetrics.head.gauge.getValue)
+ assertEquals(1, deviceMonitorMetrics.last.gauge.getValue)
+
+ device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.READ_OR_WRITE_FAILURE)
+ device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.IO_HANG)
+ assertEquals(2, deviceMonitorMetrics.head.gauge.getValue)
+ assertEquals(2, deviceMonitorMetrics.last.gauge.getValue)
+ }
+ }
}