This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new bfa341c32 [CELEBORN-255] Add counter of outstandingFetches, 
outstandingRpcs and outstandingPushes to metrics
bfa341c32 is described below

commit bfa341c32f362b64dd69b84fc54934cf8224200c
Author: SteNicholas <[email protected]>
AuthorDate: Mon Oct 16 21:16:57 2023 +0800

    [CELEBORN-255] Add counter of outstandingFetches, outstandingRpcs and 
outstandingPushes to metrics
    
    ### What changes were proposed in this pull request?
    
    Add counter of `outstandingFetches`, `outstandingRpcs` and 
`outstandingPushes` of `TransportResponseHandler` to metrics of Celeborn Worker.
    
    ### Why are the changes needed?
    
    The counter of `outstandingFetches`, `outstandingRpcs` and 
`outstandingPushes` of `TransportResponseHandler` could be added to metrics to 
monitor `outstandingFetches`, `outstandingRpcs` and `outstandingPushes`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `TransportResponseHandlerSuiteJ`
    
    Closes #1992 from SteNicholas/CELEBORN-255.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 METRICS.md                                         |   7 +-
 assets/grafana/celeborn-dashboard.json             | 284 +++++++++++++++++++++
 .../celeborn/common/network/TransportContext.java  |   2 +-
 .../network/client/TransportResponseHandler.java   |  41 ++-
 .../common/metrics/source/AbstractSource.scala     |  33 +++
 .../network/TransportResponseHandlerSuiteJ.java    |  90 ++++---
 docs/monitoring.md                                 |   6 +
 7 files changed, 404 insertions(+), 59 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index 73ade7832..b9b76e3c8 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -73,8 +73,8 @@ Here is an example of grafana dashboard importing.
 |             FlushDataTime              |      worker       |                 
                 FlushData means flush a disk buffer to disk.                   
                |
 |             OpenStreamTime             |      worker       |            
OpenStream means read a shuffle file and send client about chunks size and 
stream index.             |
 |             FetchChunkTime             |      worker       |                 
     FetchChunk means read a chunk from a shuffle file and send to client.      
                |
-|           PrimaryPushDataTime          |      worker       |                 
      PrimaryPushData means handle pushdata of primary partition location.      
                  |
-|           ReplicaPushDataTime          |      worker       |                 
       ReplicaPushData means handle pushdata of replica partition location.     
                    |
+|          PrimaryPushDataTime           |      worker       |                 
     PrimaryPushData means handle pushdata of primary partition location.       
                |
+|          ReplicaPushDataTime           |      worker       |                 
     ReplicaPushData means handle pushdata of replica partition location.       
                |
 |           WriteDataFailCount           |      worker       |                 
   The count of writing PushData or PushMergedData failed in current worker.    
                |
 |         ReplicateDataFailCount         |      worker       |                 
 The count of replicating PushData or PushMergedData failed in current worker.  
                |
 |      ReplicateDataWriteFailCount       |      worker       |       The count 
of replicating PushData or PushMergedData failed caused by write failure in 
peer worker.        |
@@ -92,6 +92,9 @@ Here is an example of grafana dashboard importing.
 |               DiskBuffer               |      worker       | Disk buffers 
are part of netty used memory, means data need to write to disk but haven't 
been written to disk.  |
 |             PausePushData              |      worker       |                 
  PausePushData means the count of worker stopped receiving data from client.   
                |
 |       PausePushDataAndReplicate        |      worker       |    
PausePushDataAndReplicate means the count of worker stopped receiving data from 
client and other workers.    |
+|         OutstandingFetchCount          |      worker       |                 
        The count of outstanding fetch request received in peer worker.         
                |
+|          OutstandingRpcCount           |      worker       |                 
         The count of outstanding rpc request received in peer worker.          
                |
+|          OutstandingPushCount          |      worker       |                 
        The count of outstanding push request received in peer worker.          
                |
 
 ## Implementation
 
diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index a2e6a1e62..146b33cc5 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -7671,6 +7671,290 @@
       ],
       "title": "UserConsumption",
       "type": "row"
+    },
+    {
+      "collapsed": true,
+      "gridPos": {
+        "h": 1,
+        "w": 24,
+        "x": 0,
+        "y": 12
+      },
+      "id": 183,
+      "panels": [
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 81
+          },
+          "id": 184,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": "metrics_OutstandingFetchCount_Count",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_OutstandingFetchCount_Count",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 81
+          },
+          "id": 185,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": "metrics_OutstandingRpcCount_Count",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_OutstandingRpcCount_Count",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 81
+          },
+          "id": 186,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": "metrics_OutstandingPushCount_Count",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_OutstandingPushCount_Count",
+          "type": "timeseries"
+        }
+      ],
+      "title": "OutstandingRequest",
+      "type": "row"
     }
   ],
   "refresh": "5s",
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java 
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index 50bb5cac1..52c68367a 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -141,7 +141,7 @@ public class TransportContext {
 
   private TransportChannelHandler createChannelHandler(
       Channel channel, BaseMessageHandler msgHandler) {
-    TransportResponseHandler responseHandler = new 
TransportResponseHandler(conf, channel);
+    TransportResponseHandler responseHandler = new 
TransportResponseHandler(conf, channel, source);
     TransportClient client = new TransportClient(channel, responseHandler);
     TransportRequestHandler requestHandler =
         new TransportRequestHandler(channel, client, msgHandler);
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index ddace8d87..aa57b4e0b 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -26,11 +26,15 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
 import org.apache.celeborn.common.network.protocol.*;
 import org.apache.celeborn.common.network.server.MessageHandler;
 import org.apache.celeborn.common.network.util.NettyUtils;
@@ -62,23 +66,25 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
   /** Records the time (in system nanoseconds) that the last fetch or RPC 
request was sent. */
   private final AtomicLong timeOfLastRequestNs;
 
-  private final long pushTimeoutCheckerInterval;
+  private final AbstractSource source;
+
   private static ScheduledExecutorService pushTimeoutChecker = null;
-  private ScheduledFuture pushCheckerScheduleFuture;
+  private ScheduledFuture<?> pushCheckerScheduleFuture;
 
-  private final long fetchTimeoutCheckerInterval;
   private static ScheduledExecutorService fetchTimeoutChecker = null;
-  private ScheduledFuture fetchCheckerScheduleFuture;
+  private ScheduledFuture<?> fetchCheckerScheduleFuture;
 
-  public TransportResponseHandler(TransportConf conf, Channel channel) {
+  public TransportResponseHandler(
+      TransportConf conf, Channel channel, @Nullable AbstractSource source) {
     this.conf = conf;
     this.channel = channel;
     this.outstandingFetches = JavaUtils.newConcurrentHashMap();
     this.outstandingRpcs = JavaUtils.newConcurrentHashMap();
     this.outstandingPushes = JavaUtils.newConcurrentHashMap();
     this.timeOfLastRequestNs = new AtomicLong(0);
-    this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
-    this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
+    this.source = source;
+    long pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
+    long fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
 
     String module = conf.getModuleName();
     boolean checkPushTimeout = false;
@@ -110,7 +116,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     if (checkPushTimeout) {
       pushCheckerScheduleFuture =
           pushTimeoutChecker.scheduleWithFixedDelay(
-              () -> failExpiredPushRequest(),
+              this::failExpiredPushRequest,
               pushTimeoutCheckerInterval,
               pushTimeoutCheckerInterval,
               TimeUnit.MILLISECONDS);
@@ -119,11 +125,13 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     if (checkFetchTimeout) {
       fetchCheckerScheduleFuture =
           fetchTimeoutChecker.scheduleWithFixedDelay(
-              () -> failExpiredFetchRequest(),
+              this::failExpiredFetchRequest,
               fetchTimeoutCheckerInterval,
               fetchTimeoutCheckerInterval,
               TimeUnit.MILLISECONDS);
     }
+
+    registerMetrics();
   }
 
   public void failExpiredPushRequest() {
@@ -178,6 +186,14 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     }
   }
 
+  private void registerMetrics() {
+    if (source != null) {
+      source.addGauge("OutstandingFetchCount", outstandingFetches::size);
+      source.addGauge("OutstandingRpcCount", outstandingRpcs::size);
+      source.addGauge("OutstandingPushCount", outstandingPushes::size);
+    }
+  }
+
   public void addFetchRequest(StreamChunkSlice streamChunkSlice, 
FetchRequestInfo info) {
     updateTimeOfLastRequest();
     if (outstandingFetches.containsKey(streamChunkSlice)) {
@@ -267,7 +283,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     if (numOutstandingRequests() > 0) {
       // show the details of outstanding Fetches
       if (logger.isDebugEnabled()) {
-        if (outstandingFetches.size() > 0) {
+        if (!outstandingFetches.isEmpty()) {
           for (Map.Entry<StreamChunkSlice, FetchRequestInfo> e : 
outstandingFetches.entrySet()) {
             StreamChunkSlice key = e.getKey();
             logger.debug("The channel is closed, but there is still 
outstanding Fetch {}", key);
@@ -455,4 +471,9 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
           streamChunkSlice);
     }
   }
+
+  @VisibleForTesting
+  public AbstractSource source() {
+    return source;
+  }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index c4409c050..bfa01b428 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -146,6 +146,39 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     namedGauges.asScala.toList
   }
 
+  /**
+   * Gets the named gauge of the metric given the metric name.
+   *
+   * Note: This method is exposed to test the value of the gauge for metric.
+   *
+   * @param name The metric name.
+   * @return The corresponding named gauge.
+   */
+  def getGauge(name: String): NamedGauge[_] = {
+    getGauge(name, Map.empty)
+  }
+
+  /**
+   * Gets the named gauge of the metric given the metric name and labels.
+   *
+   * Note: This method is exposed to test the value of the gauge for metric.
+   *
+   * @param name The metric name.
+   * @param labels The metric labels.
+   * @return The corresponding named gauge.
+   */
+  def getGauge(name: String, labels: Map[String, String] = Map.empty): 
NamedGauge[_] = {
+    val labelString = MetricLabels.labelString(labels ++ staticLabels)
+    val iter = namedGauges.iterator()
+    while (iter.hasNext) {
+      val namedGauge = iter.next()
+      if (namedGauge.name.equals(name) && 
namedGauge.labelString.equals(labelString)) {
+        return namedGauge
+      }
+    }
+    null
+  }
+
   protected def histograms(): List[NamedHistogram] = {
     List.empty[NamedHistogram]
   }
diff --git 
a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
 
b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
index 3dd32334f..cc781c1da 100644
--- 
a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
+++ 
b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
@@ -27,6 +27,7 @@ import io.netty.channel.local.LocalChannel;
 import org.junit.Test;
 
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
 import org.apache.celeborn.common.network.buffer.NioManagedBuffer;
 import org.apache.celeborn.common.network.client.ChunkReceivedCallback;
 import org.apache.celeborn.common.network.client.RpcResponseCallback;
@@ -41,50 +42,40 @@ public class TransportResponseHandlerSuiteJ {
   @Test
   public void handleSuccessfulFetch() throws Exception {
     StreamChunkSlice streamChunkSlice = new StreamChunkSlice(1, 0);
-
-    TransportResponseHandler handler =
-        new TransportResponseHandler(
-            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.FETCH_MODULE, 8),
-            new LocalChannel());
+    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.FETCH_MODULE);
     ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
     FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 
30000, callback);
     handler.addFetchRequest(streamChunkSlice, info);
-    assertEquals(1, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingFetchCount", 1);
 
     handler.handle(new ChunkFetchSuccess(streamChunkSlice, new 
TestManagedBuffer(123)));
     verify(callback, times(1)).onSuccess(eq(0), any());
-    assertEquals(0, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
   }
 
   @Test
   public void handleFailedFetch() throws Exception {
     StreamChunkSlice streamChunkSlice = new StreamChunkSlice(1, 0);
-    TransportResponseHandler handler =
-        new TransportResponseHandler(
-            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.FETCH_MODULE, 8),
-            new LocalChannel());
+    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.FETCH_MODULE);
     ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
     FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 
30000, callback);
     handler.addFetchRequest(streamChunkSlice, info);
-    assertEquals(1, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingFetchCount", 1);
 
     handler.handle(new ChunkFetchFailure(streamChunkSlice, "some error msg"));
     verify(callback, times(1)).onFailure(eq(0), any());
-    assertEquals(0, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
   }
 
   @Test
   public void clearAllOutstandingRequests() throws Exception {
-    TransportResponseHandler handler =
-        new TransportResponseHandler(
-            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.DATA_MODULE, 8),
-            new LocalChannel());
+    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.DATA_MODULE);
     ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
     FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 
30000, callback);
     handler.addFetchRequest(new StreamChunkSlice(1, 0), info);
     handler.addFetchRequest(new StreamChunkSlice(1, 1), info);
     handler.addFetchRequest(new StreamChunkSlice(1, 2), info);
-    assertEquals(3, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingFetchCount", 3);
 
     handler.handle(new ChunkFetchSuccess(new StreamChunkSlice(1, 0), new 
TestManagedBuffer(12)));
     handler.exceptionCaught(new Exception("duh duh duhhhh"));
@@ -93,58 +84,49 @@ public class TransportResponseHandlerSuiteJ {
     verify(callback, times(1)).onSuccess(eq(0), any());
     verify(callback, times(1)).onFailure(eq(1), any());
     verify(callback, times(1)).onFailure(eq(2), any());
-    assertEquals(0, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
   }
 
   @Test
   public void handleSuccessfulRPC() throws Exception {
-    TransportResponseHandler handler =
-        new TransportResponseHandler(
-            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.RPC_MODULE, 8),
-            new LocalChannel());
+    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.RPC_MODULE);
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
     handler.addRpcRequest(12345, callback);
-    assertEquals(1, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
 
     // This response should be ignored.
     handler.handle(new RpcResponse(54321, new 
NioManagedBuffer(ByteBuffer.allocate(7))));
-    assertEquals(1, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
 
     ByteBuffer resp = ByteBuffer.allocate(10);
     handler.handle(new RpcResponse(12345, new NioManagedBuffer(resp)));
     verify(callback, times(1)).onSuccess(eq(ByteBuffer.allocate(10)));
-    assertEquals(0, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingRpcCount", 0);
   }
 
   @Test
   public void handleFailedRPC() throws Exception {
-    TransportResponseHandler handler =
-        new TransportResponseHandler(
-            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.RPC_MODULE, 8),
-            new LocalChannel());
+    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.RPC_MODULE);
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
     handler.addRpcRequest(12345, callback);
-    assertEquals(1, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
 
     handler.handle(new RpcFailure(54321, "uh-oh!")); // should be ignored
-    assertEquals(1, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
 
     handler.handle(new RpcFailure(12345, "oh no"));
     verify(callback, times(1)).onFailure(any());
-    assertEquals(0, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingRpcCount", 0);
   }
 
   @Test
   public void handleSuccessfulPush() throws Exception {
-    TransportResponseHandler handler =
-        new TransportResponseHandler(
-            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.DATA_MODULE, 8),
-            new LocalChannel());
+    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.DATA_MODULE);
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
     PushRequestInfo info = new PushRequestInfo(System.currentTimeMillis() + 
30000, callback);
     info.setChannelFuture(mock(ChannelFuture.class));
     handler.addPushRequest(12345, info);
-    assertEquals(1, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingPushCount", 1);
 
     // This response should be ignored.
     handler.handle(new RpcResponse(54321, new 
NioManagedBuffer(ByteBuffer.allocate(7))));
@@ -153,26 +135,42 @@ public class TransportResponseHandlerSuiteJ {
     ByteBuffer resp = ByteBuffer.allocate(10);
     handler.handle(new RpcResponse(12345, new NioManagedBuffer(resp)));
     verify(callback, times(1)).onSuccess(eq(ByteBuffer.allocate(10)));
-    assertEquals(0, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingPushCount", 0);
   }
 
   @Test
   public void handleFailedPush() throws Exception {
-    TransportResponseHandler handler =
-        new TransportResponseHandler(
-            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.DATA_MODULE, 8),
-            new LocalChannel());
+    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.DATA_MODULE);
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
     PushRequestInfo info = new PushRequestInfo(System.currentTimeMillis() + 
30000L, callback);
     info.setChannelFuture(mock(ChannelFuture.class));
     handler.addPushRequest(12345, info);
-    assertEquals(1, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingPushCount", 1);
 
     handler.handle(new RpcFailure(54321, "uh-oh!")); // should be ignored
-    assertEquals(1, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingPushCount", 1);
 
     handler.handle(new RpcFailure(12345, "oh no"));
     verify(callback, times(1)).onFailure(any());
-    assertEquals(0, handler.numOutstandingRequests());
+    assertOutstandingRequests(handler, "OutstandingPushCount", 0);
+  }
+
+  private TransportResponseHandler createResponseHandler(String module) {
+    CelebornConf celebornConf = new CelebornConf();
+    return new TransportResponseHandler(
+        Utils.fromCelebornConf(celebornConf, module, 8),
+        new LocalChannel(),
+        new AbstractSource(celebornConf, "Worker") {
+          @Override
+          public String sourceName() {
+            return "worker";
+          }
+        });
+  }
+
+  private void assertOutstandingRequests(
+      TransportResponseHandler handler, String name, int expected) {
+    assertEquals(expected, handler.numOutstandingRequests());
+    assertEquals(expected, handler.source().getGauge(name).gauge().getValue());
   }
 }
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 54dfcf1ec..b799b5717 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -189,6 +189,12 @@ These metrics are exposed by Celeborn worker.
     - PotentialConsumeSpeed
     - UserProduceSpeed
     - WorkerConsumeSpeed
+    - OutstandingFetchCount
+        - The count of outstanding fetch request.
+    - OutstandingRpcCount
+        - The count of outstanding rpc request.
+    - OutstandingPushCount
+        - The count of outstanding push request.
     - push_server_usedHeapMemory 
     - push_server_usedDirectMemory
     - push_server_numAllocations 

Reply via email to