This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new c19df2ba [ISSUE-388][ISSUE-244][Bug] Fix incorrect usage of 
`GRPCMetrics#setGauge` (#404)
c19df2ba is described below

commit c19df2ba51b5af923627aa10f267e0ca2f6afd3b
Author: xianjingfeng <[email protected]>
AuthorDate: Tue Dec 13 12:01:40 2022 +0800

    [ISSUE-388][ISSUE-244][Bug] Fix incorrect usage of `GRPCMetrics#setGauge` 
(#404)
    
    ### What changes were proposed in this pull request?
    Fix incorrect usage of `GRPCMetrics#setGauge`
    
    ### Why are the changes needed?
    It is a bug #244 #388
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No need
---
 .../apache/uniffle/common/metrics/GRPCMetrics.java | 26 ++++++++++++++++++++++
 .../org/apache/uniffle/common/rpc/GrpcServer.java  |  6 ++---
 .../rpc/MonitoringServerTransportFilter.java       |  7 ++----
 3 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index c693604d..62972968 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -80,6 +80,32 @@ public abstract class GRPCMetrics {
     }
   }
 
+  public void incGauge(String tag) {
+    incGauge(tag, 1);
+  }
+
+  public void incGauge(String tag, double value) {
+    if (isRegistered) {
+      Gauge gauge = gaugeMap.get(tag);
+      if (gauge != null) {
+        gauge.inc(value);
+      }
+    }
+  }
+
+  public void decGauge(String tag) {
+    decGauge(tag, 1);
+  }
+
+  public void decGauge(String tag, double value) {
+    if (isRegistered) {
+      Gauge gauge = gaugeMap.get(tag);
+      if (gauge != null) {
+        gauge.dec(value);
+      }
+    }
+  }
+
   public void incCounter(String methodName) {
     if (isRegistered) {
       Gauge gauge = gaugeMap.get(methodName);
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index 2283a662..be4e916f 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -99,8 +99,7 @@ public class GrpcServer implements ServerInterface {
 
     @Override
     protected void beforeExecute(Thread t, Runnable r) {
-      grpcMetrics.setGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY,
-          activeThreadSize.incrementAndGet());
+      
grpcMetrics.incGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY);
       
grpcMetrics.setGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY,
           getQueue().size());
       super.beforeExecute(t, r);
@@ -108,8 +107,7 @@ public class GrpcServer implements ServerInterface {
 
     @Override
     protected void afterExecute(Runnable r, Throwable t) {
-      grpcMetrics.setGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY,
-          activeThreadSize.decrementAndGet());
+      
grpcMetrics.decGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY);
       
grpcMetrics.setGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY,
           getQueue().size());
       super.afterExecute(r, t);
diff --git 
a/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
 
b/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
index 085b13fd..2c29dd27 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
@@ -17,8 +17,6 @@
 
 package org.apache.uniffle.common.rpc;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import io.grpc.Attributes;
 import io.grpc.ServerTransportFilter;
 
@@ -27,7 +25,6 @@ import org.apache.uniffle.common.metrics.GRPCMetrics;
 import static 
org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;
 
 public class MonitoringServerTransportFilter extends ServerTransportFilter {
-  private final AtomicLong connectionSize = new AtomicLong(0);
   private final GRPCMetrics grpcMetrics;
 
   public MonitoringServerTransportFilter(GRPCMetrics grpcMetrics) {
@@ -35,12 +32,12 @@ public class MonitoringServerTransportFilter extends 
ServerTransportFilter {
   }
 
   public Attributes transportReady(Attributes transportAttrs) {
-    grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY, 
connectionSize.incrementAndGet());
+    grpcMetrics.incGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY);
     return super.transportReady(transportAttrs);
   }
 
   public void transportTerminated(Attributes transportAttrs) {
-    grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY, 
connectionSize.decrementAndGet());
+    grpcMetrics.decGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY);
     super.transportTerminated(transportAttrs);
   }
 }

Reply via email to