This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b1bd0790 [CELEBORN-1037] Incorrect output for metrics of Prometheus
8b1bd0790 is described below

commit 8b1bd07905022c198d677d13882a09ffb7eeafff
Author: onebox-li <[email protected]>
AuthorDate: Fri Oct 13 11:18:03 2023 +0800

    [CELEBORN-1037] Incorrect output for metrics of Prometheus
    
    ### What changes were proposed in this pull request?
    The new added `deadlocks` metrics in `ThreadStatesGaugeSet` is a 
Set<String>, which is invalid. So here add a filter at the `addGauge` extrance.
    
    ### Why are the changes needed?
    Ditto
    
    ### Does this PR introduce _any_ user-facing change?
    Remove the unused metrics. BTW the template use 
`metrics_jvm_thread_deadlock_count_Value`
    
    ### How was this patch tested?
    Manual test
    
    Closes #1981 from onebox-li/fix-1037.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../common/metrics/source/AbstractSource.scala     |   8 +-
 .../deploy/worker/storage/DeviceMonitorSuite.scala | 420 +++++++++++----------
 2 files changed, 234 insertions(+), 194 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 21fd68d33..c4409c050 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -73,7 +73,13 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
       name: String,
       labels: Map[String, String],
       gauge: Gauge[T]): Unit = {
-    namedGauges.add(NamedGauge(name, gauge, labels ++ staticLabels))
+    // filter out non-number type gauges
+    if (gauge.getValue.isInstanceOf[Number]) {
+      namedGauges.add(NamedGauge(name, gauge, labels ++ staticLabels))
+    } else {
+      logWarning(
+        s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is 
not a number")
+    }
   }
 
   def addGauge[T](
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
index 574894814..e4eb34281 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
@@ -102,6 +102,17 @@ class DeviceMonitorSuite extends AnyFunSuite {
       |/dev/vdb   1932735283200     97710505984      1835024777216   6% 
/mnt/disk5
       |""".stripMargin
 
+  val dfBOut1DiskUsageInfo =
+    DeviceMonitor.DiskUsageInfo(1395864371200L, 1293858897920L, 102005473280L, 
7)
+  val dfBOut2DiskUsageInfo =
+    DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L, 
6)
+  val dfBOut3DiskUsageInfo =
+    DeviceMonitor.DiskUsageInfo(1395864371200L, 1293858897920L, 102005473280L, 
7)
+  val dfBOut4DiskUsageInfo =
+    DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L, 
6)
+  val dfBOut5DiskUsageInfo =
+    DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L, 
6)
+
   val dirs = new jArrayList[File]()
   val workingDir1 = ListBuffer[File](new File("/mnt/disk1/data1"))
   val workingDir2 = ListBuffer[File](new File("/mnt/disk1/data2"))
@@ -163,169 +174,183 @@ class DeviceMonitorSuite extends AnyFunSuite {
 
   test("init") {
     withObjectMocked[org.apache.celeborn.common.util.Utils.type] {
-      when(Utils.runCommand(dfCmd)) thenReturn dfOut
-      when(Utils.runCommand(lsCmd)) thenReturn lsOut
-
-      deviceMonitor.init()
-
-      assertEquals(2, deviceMonitor.observedDevices.size())
-
-      assert(deviceMonitor.observedDevices.containsKey(vdaDeviceInfo))
-      assert(deviceMonitor.observedDevices.containsKey(vdbDeviceInfo))
-
-      
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.size, 1)
-      
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.size, 1)
-
-      assert(
-        
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.containsKey("/mnt/disk1"))
-      assert(
-        
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.containsKey("/mnt/disk2"))
-
-      assertEquals(
-        
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(0),
-        new File("/mnt/disk1/data1"))
-      assertEquals(
-        
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(1),
-        new File("/mnt/disk1/data2"))
-      assertEquals(
-        
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(0),
-        new File("/mnt/disk2/data3"))
-      assertEquals(
-        
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(1),
-        new File("/mnt/disk2/data4"))
-
-      
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
1)
-      
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
1)
+      
withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type]
 {
+        when(Utils.runCommand(dfCmd)) thenReturn dfOut
+        when(Utils.runCommand(lsCmd)) thenReturn lsOut
+
+        
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk1"))).thenReturn(
+          dfBOut1DiskUsageInfo)
+        
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk2"))).thenReturn(
+          dfBOut2DiskUsageInfo)
+
+        deviceMonitor.init()
+
+        assertEquals(2, deviceMonitor.observedDevices.size())
+
+        assert(deviceMonitor.observedDevices.containsKey(vdaDeviceInfo))
+        assert(deviceMonitor.observedDevices.containsKey(vdbDeviceInfo))
+
+        
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.size, 1)
+        
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.size, 1)
+
+        assert(
+          
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.containsKey("/mnt/disk1"))
+        assert(
+          
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.containsKey("/mnt/disk2"))
+
+        assertEquals(
+          
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(0),
+          new File("/mnt/disk1/data1"))
+        assertEquals(
+          
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(1),
+          new File("/mnt/disk1/data2"))
+        assertEquals(
+          
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(0),
+          new File("/mnt/disk2/data3"))
+        assertEquals(
+          
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(1),
+          new File("/mnt/disk2/data4"))
+
+        
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
1)
+        
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
1)
+      }
     }
   }
 
   test("register/unregister/notify/report") {
     withObjectMocked[org.apache.celeborn.common.util.Utils.type] {
-      when(Utils.runCommand(dfCmd)) thenReturn dfOut
-      when(Utils.runCommand(lsCmd)) thenReturn lsOut
-
-      deviceMonitor.init()
-
-      val fw1 = mock[FileWriter]
-      val fw2 = mock[FileWriter]
-      val fw3 = mock[FileWriter]
-      val fw4 = mock[FileWriter]
-
-      val f1 = new File("/mnt/disk1/data1/f1")
-      val f2 = new File("/mnt/disk1/data2/f2")
-      val f3 = new File("/mnt/disk2/data3/f3")
-      val f4 = new File("/mnt/disk2/data4/f4")
-      when(fw1.getFile).thenReturn(f1)
-      when(fw2.getFile).thenReturn(f2)
-      when(fw3.getFile).thenReturn(f3)
-      when(fw4.getFile).thenReturn(f4)
-
-      deviceMonitor.registerFileWriter(fw1)
-      deviceMonitor.registerFileWriter(fw2)
-      deviceMonitor.registerFileWriter(fw3)
-      deviceMonitor.registerFileWriter(fw4)
-
-      
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
3)
-      
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
3)
-      assert(
-        
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw1))
-      
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2))
-      assert(
-        
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw3))
-      
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4))
-
-      deviceMonitor.unregisterFileWriter(fw1)
-      deviceMonitor.unregisterFileWriter(fw3)
-      
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
2)
-      
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
2)
-      assert(
-        
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2))
-      assert(
-        
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4))
-
-      val df1 = mock[LocalFlusher]
-      val df2 = mock[LocalFlusher]
-      val df3 = mock[LocalFlusher]
-      val df4 = mock[LocalFlusher]
-
-      when(df1.stopFlag).thenReturn(new AtomicBoolean(false))
-      when(df2.stopFlag).thenReturn(new AtomicBoolean(false))
-      when(df3.stopFlag).thenReturn(new AtomicBoolean(false))
-      when(df4.stopFlag).thenReturn(new AtomicBoolean(false))
-
-      when(df1.mountPoint).thenReturn("/mnt/disk1")
-      when(df2.mountPoint).thenReturn("/mnt/disk1")
-      when(df3.mountPoint).thenReturn("/mnt/disk2")
-      when(df4.mountPoint).thenReturn("/mnt/disk2")
-
-      deviceMonitor.registerFlusher(df1)
-      deviceMonitor.registerFlusher(df2)
-      deviceMonitor.registerFlusher(df3)
-      deviceMonitor.registerFlusher(df4)
-      
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
4)
-      
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
4)
-      assert(
-        
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df1))
-      
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
-      assert(
-        
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df3))
-      
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
-
-      deviceMonitor.unregisterFlusher(df1)
-      deviceMonitor.unregisterFlusher(df3)
-      
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
3)
-      
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
3)
-      assert(
-        
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
-      assert(
-        
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
-
-      when(fw2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
-        .thenAnswer((a: String, b: List[File]) => {
-          deviceMonitor.unregisterFileWriter(fw2)
-        })
-      when(fw4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
-        .thenAnswer((a: String, b: List[File]) => {
-          deviceMonitor.unregisterFileWriter(fw4)
-        })
-      when(df2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
-        .thenAnswer((a: String, b: List[File]) => {
-          df2.stopFlag.set(true)
-        })
-      when(df4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
-        .thenAnswer((a: String, b: List[File]) => {
-          df4.stopFlag.set(true)
-        })
-
-      deviceMonitor.observedDevices
-        .get(vdaDeviceInfo)
-        .notifyObserversOnError(List("/mnt/disk1"), DiskStatus.IO_HANG)
-      deviceMonitor.observedDevices
-        .get(vdbDeviceInfo)
-        .notifyObserversOnError(List("/mnt/disk2"), DiskStatus.IO_HANG)
-      
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
3)
-      
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
3)
-      assert(
-        
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
-      assert(
-        
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
-
-      deviceMonitor.registerFileWriter(fw1)
-      deviceMonitor.registerFileWriter(fw2)
-      deviceMonitor.registerFileWriter(fw3)
-      deviceMonitor.registerFileWriter(fw4)
-      
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
4)
-      
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
4)
+      
withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type]
 {
+        when(Utils.runCommand(dfCmd)) thenReturn dfOut
+        when(Utils.runCommand(lsCmd)) thenReturn lsOut
+
+        
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk1"))).thenReturn(
+          dfBOut1DiskUsageInfo)
+        
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk2"))).thenReturn(
+          dfBOut2DiskUsageInfo)
+
+        deviceMonitor.init()
+
+        val fw1 = mock[FileWriter]
+        val fw2 = mock[FileWriter]
+        val fw3 = mock[FileWriter]
+        val fw4 = mock[FileWriter]
+
+        val f1 = new File("/mnt/disk1/data1/f1")
+        val f2 = new File("/mnt/disk1/data2/f2")
+        val f3 = new File("/mnt/disk2/data3/f3")
+        val f4 = new File("/mnt/disk2/data4/f4")
+        when(fw1.getFile).thenReturn(f1)
+        when(fw2.getFile).thenReturn(f2)
+        when(fw3.getFile).thenReturn(f3)
+        when(fw4.getFile).thenReturn(f4)
+
+        deviceMonitor.registerFileWriter(fw1)
+        deviceMonitor.registerFileWriter(fw2)
+        deviceMonitor.registerFileWriter(fw3)
+        deviceMonitor.registerFileWriter(fw4)
+
+        
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
3)
+        
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
3)
+        assert(
+          
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
+        
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw1))
+        
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2))
+        assert(
+          
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
+        
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw3))
+        
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4))
+
+        deviceMonitor.unregisterFileWriter(fw1)
+        deviceMonitor.unregisterFileWriter(fw3)
+        
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
2)
+        
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
2)
+        assert(
+          
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
+        
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2))
+        assert(
+          
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
+        
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4))
+
+        val df1 = mock[LocalFlusher]
+        val df2 = mock[LocalFlusher]
+        val df3 = mock[LocalFlusher]
+        val df4 = mock[LocalFlusher]
+
+        when(df1.stopFlag).thenReturn(new AtomicBoolean(false))
+        when(df2.stopFlag).thenReturn(new AtomicBoolean(false))
+        when(df3.stopFlag).thenReturn(new AtomicBoolean(false))
+        when(df4.stopFlag).thenReturn(new AtomicBoolean(false))
+
+        when(df1.mountPoint).thenReturn("/mnt/disk1")
+        when(df2.mountPoint).thenReturn("/mnt/disk1")
+        when(df3.mountPoint).thenReturn("/mnt/disk2")
+        when(df4.mountPoint).thenReturn("/mnt/disk2")
+
+        deviceMonitor.registerFlusher(df1)
+        deviceMonitor.registerFlusher(df2)
+        deviceMonitor.registerFlusher(df3)
+        deviceMonitor.registerFlusher(df4)
+        
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
4)
+        
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
4)
+        assert(
+          
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
+        
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df1))
+        
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
+        assert(
+          
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
+        
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df3))
+        
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
+
+        deviceMonitor.unregisterFlusher(df1)
+        deviceMonitor.unregisterFlusher(df3)
+        
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
3)
+        
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
3)
+        assert(
+          
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
+        
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
+        assert(
+          
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
+        
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
+
+        when(fw2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
+          .thenAnswer((a: String, b: List[File]) => {
+            deviceMonitor.unregisterFileWriter(fw2)
+          })
+        when(fw4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
+          .thenAnswer((a: String, b: List[File]) => {
+            deviceMonitor.unregisterFileWriter(fw4)
+          })
+        when(df2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
+          .thenAnswer((a: String, b: List[File]) => {
+            df2.stopFlag.set(true)
+          })
+        when(df4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
+          .thenAnswer((a: String, b: List[File]) => {
+            df4.stopFlag.set(true)
+          })
+
+        deviceMonitor.observedDevices
+          .get(vdaDeviceInfo)
+          .notifyObserversOnError(List("/mnt/disk1"), DiskStatus.IO_HANG)
+        deviceMonitor.observedDevices
+          .get(vdbDeviceInfo)
+          .notifyObserversOnError(List("/mnt/disk2"), DiskStatus.IO_HANG)
+        
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
3)
+        
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
3)
+        assert(
+          
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
+        
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
+        assert(
+          
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
+        
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
+
+        deviceMonitor.registerFileWriter(fw1)
+        deviceMonitor.registerFileWriter(fw2)
+        deviceMonitor.registerFileWriter(fw3)
+        deviceMonitor.registerFileWriter(fw4)
+        
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
4)
+        
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
4)
+      }
     }
   }
 
@@ -350,45 +375,52 @@ class DeviceMonitorSuite extends AnyFunSuite {
 
   test("monitor non-critical error metrics") {
     withObjectMocked[org.apache.celeborn.common.util.Utils.type] {
-      when(Utils.runCommand(dfCmd)) thenReturn dfOut
-      when(Utils.runCommand(lsCmd)) thenReturn lsOut
-
-      deviceMonitor.init()
-
-      val device1 = deviceMonitor.observedDevices.values().asScala.head
-      val mountPoints1 = device1.diskInfos.keySet().asScala.toList
-
-      device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.READ_OR_WRITE_FAILURE)
-      device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.IO_HANG)
-      val deviceMonitorMetrics =
-        workerSource.gauges().filter(_.name.startsWith("Device_" + 
device1.deviceInfo.name))
-          .sortBy(_.name)
-
-      assertEquals("Device_vda_IoHang_Count", deviceMonitorMetrics.head.name)
-      assertEquals("Device_vda_ReadOrWriteFailure_Count", 
deviceMonitorMetrics.last.name)
-      assertEquals(1, deviceMonitorMetrics.head.gauge.getValue)
-      assertEquals(1, deviceMonitorMetrics.last.gauge.getValue)
-
-      device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.READ_OR_WRITE_FAILURE)
-      device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.IO_HANG)
-      assertEquals(2, deviceMonitorMetrics.head.gauge.getValue)
-      assertEquals(2, deviceMonitorMetrics.last.gauge.getValue)
+      
withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type]
 {
+        when(Utils.runCommand(dfCmd)) thenReturn dfOut
+        when(Utils.runCommand(lsCmd)) thenReturn lsOut
+
+        
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk1"))).thenReturn(
+          dfBOut1DiskUsageInfo)
+        
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk2"))).thenReturn(
+          dfBOut2DiskUsageInfo)
+
+        deviceMonitor.init()
+
+        val device1 = deviceMonitor.observedDevices.values().asScala.head
+        val mountPoints1 = device1.diskInfos.keySet().asScala.toList
+
+        device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.READ_OR_WRITE_FAILURE)
+        device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.IO_HANG)
+        val deviceMonitorMetrics =
+          workerSource.gauges().filter(_.name.startsWith("Device_" + 
device1.deviceInfo.name))
+            .sortBy(_.name)
+
+        assertEquals("Device_vda_IoHang_Count", deviceMonitorMetrics.head.name)
+        assertEquals("Device_vda_ReadOrWriteFailure_Count", 
deviceMonitorMetrics.last.name)
+        assertEquals(1, deviceMonitorMetrics.head.gauge.getValue)
+        assertEquals(1, deviceMonitorMetrics.last.gauge.getValue)
+
+        device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.READ_OR_WRITE_FAILURE)
+        device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.IO_HANG)
+        assertEquals(2, deviceMonitorMetrics.head.gauge.getValue)
+        assertEquals(2, deviceMonitorMetrics.last.gauge.getValue)
+      }
     }
   }
 
   test("monitor device usage metrics") {
 
     
withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type]
 {
-      val dfBOut1 = DeviceMonitor.DiskUsageInfo(1395864371200L, 
1293858897920L, 102005473280L, 7)
-      val dfBOut2 = DeviceMonitor.DiskUsageInfo(1932735283200L, 
1835024777216L, 97710505984L, 6)
-      val dfBOut3 = DeviceMonitor.DiskUsageInfo(1395864371200L, 
1293858897920L, 102005473280L, 7)
-      val dfBOut4 = DeviceMonitor.DiskUsageInfo(1932735283200L, 
1835024777216L, 97710505984L, 6)
-      val dfBOut5 = DeviceMonitor.DiskUsageInfo(1932735283200L, 
1835024777216L, 97710505984L, 6)
-      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(dfBOut1)
-      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk2"))).thenReturn(dfBOut2)
-      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk3"))).thenReturn(dfBOut3)
-      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk4"))).thenReturn(dfBOut4)
-      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk5"))).thenReturn(dfBOut5)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(
+        dfBOut1DiskUsageInfo)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk2"))).thenReturn(
+        dfBOut2DiskUsageInfo)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk3"))).thenReturn(
+        dfBOut3DiskUsageInfo)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk4"))).thenReturn(
+        dfBOut4DiskUsageInfo)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk5"))).thenReturn(
+        dfBOut5DiskUsageInfo)
 
       deviceMonitor2.init()
 
@@ -423,8 +455,10 @@ class DeviceMonitorSuite extends AnyFunSuite {
       assertEquals("vdb", metrics4.last.labels("device"))
       assertEquals(1024L * 3, metrics4.last.gauge.getValue)
 
-      val dfBOut6 = DeviceMonitor.DiskUsageInfo(1395864371200L, 
1264867868672L, 130996502528L, 9)
-      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(dfBOut6)
+      val dfBOut6DiskUsageInfo =
+        DeviceMonitor.DiskUsageInfo(1395864371200L, 1264867868672L, 
130996502528L, 9)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(
+        dfBOut6DiskUsageInfo)
       assertEquals(1264867868672L, metrics2.head.gauge.getValue)
     }
   }

Reply via email to