This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new bfa341c32 [CELEBORN-255] Add counter of outstandingFetches,
outstandingRpcs and outstandingPushes to metrics
bfa341c32 is described below
commit bfa341c32f362b64dd69b84fc54934cf8224200c
Author: SteNicholas <[email protected]>
AuthorDate: Mon Oct 16 21:16:57 2023 +0800
[CELEBORN-255] Add counter of outstandingFetches, outstandingRpcs and
outstandingPushes to metrics
### What changes were proposed in this pull request?
Add counter of `outstandingFetches`, `outstandingRpcs` and
`outstandingPushes` of `TransportResponseHandler` to metrics of Celeborn Worker.
### Why are the changes needed?
The counter of `outstandingFetches`, `outstandingRpcs` and
`outstandingPushes` of `TransportResponseHandler` could be added to metrics to
monitor `outstandingFetches`, `outstandingRpcs` and `outstandingPushes`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`TransportResponseHandlerSuiteJ`
Closes #1992 from SteNicholas/CELEBORN-255.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
METRICS.md | 7 +-
assets/grafana/celeborn-dashboard.json | 284 +++++++++++++++++++++
.../celeborn/common/network/TransportContext.java | 2 +-
.../network/client/TransportResponseHandler.java | 41 ++-
.../common/metrics/source/AbstractSource.scala | 33 +++
.../network/TransportResponseHandlerSuiteJ.java | 90 ++++---
docs/monitoring.md | 6 +
7 files changed, 404 insertions(+), 59 deletions(-)
diff --git a/METRICS.md b/METRICS.md
index 73ade7832..b9b76e3c8 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -73,8 +73,8 @@ Here is an example of grafana dashboard importing.
| FlushDataTime | worker |
FlushData means flush a disk buffer to disk.
|
| OpenStreamTime | worker |
OpenStream means read a shuffle file and send client about chunks size and
stream index. |
| FetchChunkTime | worker |
FetchChunk means read a chunk from a shuffle file and send to client.
|
-| PrimaryPushDataTime | worker |
PrimaryPushData means handle pushdata of primary partition location.
|
-| ReplicaPushDataTime | worker |
ReplicaPushData means handle pushdata of replica partition location.
|
+| PrimaryPushDataTime | worker |
PrimaryPushData means handle pushdata of primary partition location.
|
+| ReplicaPushDataTime | worker |
ReplicaPushData means handle pushdata of replica partition location.
|
| WriteDataFailCount | worker |
The count of writing PushData or PushMergedData failed in current worker.
|
| ReplicateDataFailCount | worker |
The count of replicating PushData or PushMergedData failed in current worker.
|
| ReplicateDataWriteFailCount | worker | The count
of replicating PushData or PushMergedData failed caused by write failure in
peer worker. |
@@ -92,6 +92,9 @@ Here is an example of grafana dashboard importing.
| DiskBuffer | worker | Disk buffers
are part of netty used memory, means data need to write to disk but haven't
been written to disk. |
| PausePushData | worker |
PausePushData means the count of worker stopped receiving data from client.
|
| PausePushDataAndReplicate | worker |
PausePushDataAndReplicate means the count of worker stopped receiving data from
client and other workers. |
+| OutstandingFetchCount | worker |
The count of outstanding fetch request received in peer worker.
|
+| OutstandingRpcCount | worker |
The count of outstanding rpc request received in peer worker.
|
+| OutstandingPushCount | worker |
The count of outstanding push request received in peer worker.
|
## Implementation
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index a2e6a1e62..146b33cc5 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -7671,6 +7671,290 @@
],
"title": "UserConsumption",
"type": "row"
+ },
+ {
+ "collapsed": true,
+ "gridPos": {
+ "h": 1,
+ "w": 24,
+ "x": 0,
+ "y": 12
+ },
+ "id": 183,
+ "panels": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 81
+ },
+ "id": 184,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr": "metrics_OutstandingFetchCount_Count",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_OutstandingFetchCount_Count",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 81
+ },
+ "id": 185,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr": "metrics_OutstandingRpcCount_Count",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_OutstandingRpcCount_Count",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 81
+ },
+ "id": 186,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr": "metrics_OutstandingPushCount_Count",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_OutstandingPushCount_Count",
+ "type": "timeseries"
+ }
+ ],
+ "title": "OutstandingRequest",
+ "type": "row"
}
],
"refresh": "5s",
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index 50bb5cac1..52c68367a 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -141,7 +141,7 @@ public class TransportContext {
private TransportChannelHandler createChannelHandler(
Channel channel, BaseMessageHandler msgHandler) {
- TransportResponseHandler responseHandler = new
TransportResponseHandler(conf, channel);
+ TransportResponseHandler responseHandler = new
TransportResponseHandler(conf, channel, source);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler =
new TransportRequestHandler(channel, client, msgHandler);
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index ddace8d87..aa57b4e0b 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -26,11 +26,15 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.network.protocol.*;
import org.apache.celeborn.common.network.server.MessageHandler;
import org.apache.celeborn.common.network.util.NettyUtils;
@@ -62,23 +66,25 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
/** Records the time (in system nanoseconds) that the last fetch or RPC
request was sent. */
private final AtomicLong timeOfLastRequestNs;
- private final long pushTimeoutCheckerInterval;
+ private final AbstractSource source;
+
private static ScheduledExecutorService pushTimeoutChecker = null;
- private ScheduledFuture pushCheckerScheduleFuture;
+ private ScheduledFuture<?> pushCheckerScheduleFuture;
- private final long fetchTimeoutCheckerInterval;
private static ScheduledExecutorService fetchTimeoutChecker = null;
- private ScheduledFuture fetchCheckerScheduleFuture;
+ private ScheduledFuture<?> fetchCheckerScheduleFuture;
- public TransportResponseHandler(TransportConf conf, Channel channel) {
+ public TransportResponseHandler(
+ TransportConf conf, Channel channel, @Nullable AbstractSource source) {
this.conf = conf;
this.channel = channel;
this.outstandingFetches = JavaUtils.newConcurrentHashMap();
this.outstandingRpcs = JavaUtils.newConcurrentHashMap();
this.outstandingPushes = JavaUtils.newConcurrentHashMap();
this.timeOfLastRequestNs = new AtomicLong(0);
- this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
- this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
+ this.source = source;
+ long pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
+ long fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
String module = conf.getModuleName();
boolean checkPushTimeout = false;
@@ -110,7 +116,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
if (checkPushTimeout) {
pushCheckerScheduleFuture =
pushTimeoutChecker.scheduleWithFixedDelay(
- () -> failExpiredPushRequest(),
+ this::failExpiredPushRequest,
pushTimeoutCheckerInterval,
pushTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
@@ -119,11 +125,13 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
if (checkFetchTimeout) {
fetchCheckerScheduleFuture =
fetchTimeoutChecker.scheduleWithFixedDelay(
- () -> failExpiredFetchRequest(),
+ this::failExpiredFetchRequest,
fetchTimeoutCheckerInterval,
fetchTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
}
+
+ registerMetrics();
}
public void failExpiredPushRequest() {
@@ -178,6 +186,14 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
}
}
+ private void registerMetrics() {
+ if (source != null) {
+ source.addGauge("OutstandingFetchCount", outstandingFetches::size);
+ source.addGauge("OutstandingRpcCount", outstandingRpcs::size);
+ source.addGauge("OutstandingPushCount", outstandingPushes::size);
+ }
+ }
+
public void addFetchRequest(StreamChunkSlice streamChunkSlice,
FetchRequestInfo info) {
updateTimeOfLastRequest();
if (outstandingFetches.containsKey(streamChunkSlice)) {
@@ -267,7 +283,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
if (numOutstandingRequests() > 0) {
// show the details of outstanding Fetches
if (logger.isDebugEnabled()) {
- if (outstandingFetches.size() > 0) {
+ if (!outstandingFetches.isEmpty()) {
for (Map.Entry<StreamChunkSlice, FetchRequestInfo> e :
outstandingFetches.entrySet()) {
StreamChunkSlice key = e.getKey();
logger.debug("The channel is closed, but there is still
outstanding Fetch {}", key);
@@ -455,4 +471,9 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
streamChunkSlice);
}
}
+
+ @VisibleForTesting
+ public AbstractSource source() {
+ return source;
+ }
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index c4409c050..bfa01b428 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -146,6 +146,39 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
namedGauges.asScala.toList
}
+ /**
+ * Gets the named gauge of the metric given the metric name.
+ *
+ * Note: This method is exposed to test the value of the gauge for metric.
+ *
+ * @param name The metric name.
+ * @return The corresponding named gauge.
+ */
+ def getGauge(name: String): NamedGauge[_] = {
+ getGauge(name, Map.empty)
+ }
+
+ /**
+ * Gets the named gauge of the metric given the metric name and labels.
+ *
+ * Note: This method is exposed to test the value of the gauge for metric.
+ *
+ * @param name The metric name.
+ * @param labels The metric labels.
+ * @return The corresponding named gauge.
+ */
+ def getGauge(name: String, labels: Map[String, String] = Map.empty):
NamedGauge[_] = {
+ val labelString = MetricLabels.labelString(labels ++ staticLabels)
+ val iter = namedGauges.iterator()
+ while (iter.hasNext) {
+ val namedGauge = iter.next()
+ if (namedGauge.name.equals(name) &&
namedGauge.labelString.equals(labelString)) {
+ return namedGauge
+ }
+ }
+ null
+ }
+
protected def histograms(): List[NamedHistogram] = {
List.empty[NamedHistogram]
}
diff --git
a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
index 3dd32334f..cc781c1da 100644
---
a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
@@ -27,6 +27,7 @@ import io.netty.channel.local.LocalChannel;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.network.buffer.NioManagedBuffer;
import org.apache.celeborn.common.network.client.ChunkReceivedCallback;
import org.apache.celeborn.common.network.client.RpcResponseCallback;
@@ -41,50 +42,40 @@ public class TransportResponseHandlerSuiteJ {
@Test
public void handleSuccessfulFetch() throws Exception {
StreamChunkSlice streamChunkSlice = new StreamChunkSlice(1, 0);
-
- TransportResponseHandler handler =
- new TransportResponseHandler(
- Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.FETCH_MODULE, 8),
- new LocalChannel());
+ TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.FETCH_MODULE);
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() +
30000, callback);
handler.addFetchRequest(streamChunkSlice, info);
- assertEquals(1, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingFetchCount", 1);
handler.handle(new ChunkFetchSuccess(streamChunkSlice, new
TestManagedBuffer(123)));
verify(callback, times(1)).onSuccess(eq(0), any());
- assertEquals(0, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
}
@Test
public void handleFailedFetch() throws Exception {
StreamChunkSlice streamChunkSlice = new StreamChunkSlice(1, 0);
- TransportResponseHandler handler =
- new TransportResponseHandler(
- Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.FETCH_MODULE, 8),
- new LocalChannel());
+ TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.FETCH_MODULE);
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() +
30000, callback);
handler.addFetchRequest(streamChunkSlice, info);
- assertEquals(1, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingFetchCount", 1);
handler.handle(new ChunkFetchFailure(streamChunkSlice, "some error msg"));
verify(callback, times(1)).onFailure(eq(0), any());
- assertEquals(0, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
}
@Test
public void clearAllOutstandingRequests() throws Exception {
- TransportResponseHandler handler =
- new TransportResponseHandler(
- Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.DATA_MODULE, 8),
- new LocalChannel());
+ TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.DATA_MODULE);
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() +
30000, callback);
handler.addFetchRequest(new StreamChunkSlice(1, 0), info);
handler.addFetchRequest(new StreamChunkSlice(1, 1), info);
handler.addFetchRequest(new StreamChunkSlice(1, 2), info);
- assertEquals(3, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingFetchCount", 3);
handler.handle(new ChunkFetchSuccess(new StreamChunkSlice(1, 0), new
TestManagedBuffer(12)));
handler.exceptionCaught(new Exception("duh duh duhhhh"));
@@ -93,58 +84,49 @@ public class TransportResponseHandlerSuiteJ {
verify(callback, times(1)).onSuccess(eq(0), any());
verify(callback, times(1)).onFailure(eq(1), any());
verify(callback, times(1)).onFailure(eq(2), any());
- assertEquals(0, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
}
@Test
public void handleSuccessfulRPC() throws Exception {
- TransportResponseHandler handler =
- new TransportResponseHandler(
- Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.RPC_MODULE, 8),
- new LocalChannel());
+ TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.RPC_MODULE);
RpcResponseCallback callback = mock(RpcResponseCallback.class);
handler.addRpcRequest(12345, callback);
- assertEquals(1, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
// This response should be ignored.
handler.handle(new RpcResponse(54321, new
NioManagedBuffer(ByteBuffer.allocate(7))));
- assertEquals(1, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
ByteBuffer resp = ByteBuffer.allocate(10);
handler.handle(new RpcResponse(12345, new NioManagedBuffer(resp)));
verify(callback, times(1)).onSuccess(eq(ByteBuffer.allocate(10)));
- assertEquals(0, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingRpcCount", 0);
}
@Test
public void handleFailedRPC() throws Exception {
- TransportResponseHandler handler =
- new TransportResponseHandler(
- Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.RPC_MODULE, 8),
- new LocalChannel());
+ TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.RPC_MODULE);
RpcResponseCallback callback = mock(RpcResponseCallback.class);
handler.addRpcRequest(12345, callback);
- assertEquals(1, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
handler.handle(new RpcFailure(54321, "uh-oh!")); // should be ignored
- assertEquals(1, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
handler.handle(new RpcFailure(12345, "oh no"));
verify(callback, times(1)).onFailure(any());
- assertEquals(0, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingRpcCount", 0);
}
@Test
public void handleSuccessfulPush() throws Exception {
- TransportResponseHandler handler =
- new TransportResponseHandler(
- Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.DATA_MODULE, 8),
- new LocalChannel());
+ TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.DATA_MODULE);
RpcResponseCallback callback = mock(RpcResponseCallback.class);
PushRequestInfo info = new PushRequestInfo(System.currentTimeMillis() +
30000, callback);
info.setChannelFuture(mock(ChannelFuture.class));
handler.addPushRequest(12345, info);
- assertEquals(1, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingPushCount", 1);
// This response should be ignored.
handler.handle(new RpcResponse(54321, new
NioManagedBuffer(ByteBuffer.allocate(7))));
@@ -153,26 +135,42 @@ public class TransportResponseHandlerSuiteJ {
ByteBuffer resp = ByteBuffer.allocate(10);
handler.handle(new RpcResponse(12345, new NioManagedBuffer(resp)));
verify(callback, times(1)).onSuccess(eq(ByteBuffer.allocate(10)));
- assertEquals(0, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingPushCount", 0);
}
@Test
public void handleFailedPush() throws Exception {
- TransportResponseHandler handler =
- new TransportResponseHandler(
- Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.DATA_MODULE, 8),
- new LocalChannel());
+ TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.DATA_MODULE);
RpcResponseCallback callback = mock(RpcResponseCallback.class);
PushRequestInfo info = new PushRequestInfo(System.currentTimeMillis() +
30000L, callback);
info.setChannelFuture(mock(ChannelFuture.class));
handler.addPushRequest(12345, info);
- assertEquals(1, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingPushCount", 1);
handler.handle(new RpcFailure(54321, "uh-oh!")); // should be ignored
- assertEquals(1, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingPushCount", 1);
handler.handle(new RpcFailure(12345, "oh no"));
verify(callback, times(1)).onFailure(any());
- assertEquals(0, handler.numOutstandingRequests());
+ assertOutstandingRequests(handler, "OutstandingPushCount", 0);
+ }
+
+ private TransportResponseHandler createResponseHandler(String module) {
+ CelebornConf celebornConf = new CelebornConf();
+ return new TransportResponseHandler(
+ Utils.fromCelebornConf(celebornConf, module, 8),
+ new LocalChannel(),
+ new AbstractSource(celebornConf, "Worker") {
+ @Override
+ public String sourceName() {
+ return "worker";
+ }
+ });
+ }
+
+ private void assertOutstandingRequests(
+ TransportResponseHandler handler, String name, int expected) {
+ assertEquals(expected, handler.numOutstandingRequests());
+ assertEquals(expected, handler.source().getGauge(name).gauge().getValue());
}
}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 54dfcf1ec..b799b5717 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -189,6 +189,12 @@ These metrics are exposed by Celeborn worker.
- PotentialConsumeSpeed
- UserProduceSpeed
- WorkerConsumeSpeed
+ - OutstandingFetchCount
+ - The count of outstanding fetch request.
+ - OutstandingRpcCount
+ - The count of outstanding rpc request.
+ - OutstandingPushCount
+ - The count of outstanding push request.
- push_server_usedHeapMemory
- push_server_usedDirectMemory
- push_server_numAllocations