This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9635d7a0 [ISSUE-169] Support metric reporter and Support promethues 
push gateway (#415)
9635d7a0 is described below

commit 9635d7a03551f36c6858e9779ae50082b5f90033
Author: xianjingfeng <[email protected]>
AuthorDate: Wed Dec 21 19:32:49 2022 +0800

    [ISSUE-169] Support metric reporter and Support promethues push gateway 
(#415)
    
    ### What changes were proposed in this pull request?
    Support metric reporter and Support promethues push gateway
    
    ### Why are the changes needed?
    Users can implement custom export plugin to export metrics to their 
monitoring system. #169
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
---
 common/pom.xml                                     |   4 +
 .../apache/uniffle/common/config/RssBaseConf.java  |   6 ++
 .../common/metrics/AbstractMetricReporter.java     |  41 +++++++
 .../uniffle/common/metrics/MetricReporter.java     |  29 +++++
 .../common/metrics/MetricReporterFactory.java      |  39 +++++++
 .../PrometheusPushGatewayMetricReporter.java       | 120 +++++++++++++++++++++
 .../PrometheusPushGatewayMetricReporterTest.java   | 114 ++++++++++++++++++++
 .../uniffle/coordinator/CoordinatorServer.java     |  25 ++++-
 docs/coordinator_guide.md                          |  11 ++
 docs/server_guide.md                               |  14 ++-
 pom.xml                                            |   6 +-
 .../org/apache/uniffle/server/ShuffleServer.java   |  16 ++-
 12 files changed, 421 insertions(+), 4 deletions(-)

diff --git a/common/pom.xml b/common/pom.xml
index aec42750..e12db315 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -60,6 +60,10 @@
       <groupId>io.prometheus</groupId>
       <artifactId>simpleclient_servlet</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient_pushgateway</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index ec28417f..1d794fb3 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -199,6 +199,12 @@ public class RssBaseConf extends RssConf {
       .defaultValue(false)
       .withDescription("Whether enable test mode for the shuffle server.");
 
+  public static final ConfigOption<String> RSS_METRICS_REPORTER_CLASS = 
ConfigOptions
+      .key("rss.metrics.reporter.class")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("The class of metrics reporter.");
+
   public boolean loadCommonConf(Map<String, String> properties) {
     if (properties == null) {
       return false;
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/AbstractMetricReporter.java
 
b/common/src/main/java/org/apache/uniffle/common/metrics/AbstractMetricReporter.java
new file mode 100644
index 00000000..58f035c5
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/metrics/AbstractMetricReporter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.prometheus.client.CollectorRegistry;
+
+import org.apache.uniffle.common.config.RssConf;
+
+public abstract class AbstractMetricReporter implements MetricReporter {
+  protected final RssConf conf;
+  protected final String instanceId;
+  protected List<CollectorRegistry> registryList = new ArrayList<>();
+
+  public AbstractMetricReporter(RssConf conf, String instanceId) {
+    this.conf = conf;
+    this.instanceId = instanceId;
+  }
+
+  @Override
+  public void addCollectorRegistry(CollectorRegistry registry) {
+    registryList.add(registry);
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporter.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporter.java
new file mode 100644
index 00000000..81ca52bf
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+import io.prometheus.client.CollectorRegistry;
+
+public interface MetricReporter {
+
+  void start();
+
+  void stop();
+
+  void addCollectorRegistry(CollectorRegistry registry);
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
 
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
new file mode 100644
index 00000000..d66576df
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+
+public class MetricReporterFactory {
+
+  public static MetricReporter getMetricReporter(RssConf conf, String 
instanceId) throws Exception {
+    String name = conf.get(RssBaseConf.RSS_METRICS_REPORTER_CLASS);
+    if (StringUtils.isEmpty(name)) {
+      return null;
+    }
+    Class<?> klass = Class.forName(name);
+    Constructor<?> constructor;
+    constructor = klass.getConstructor(conf.getClass(), instanceId.getClass());
+    return (AbstractMetricReporter) constructor.newInstance(conf, instanceId);
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
 
b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
new file mode 100644
index 00000000..8795d5b6
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics.prometheus;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.metrics.AbstractMetricReporter;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+public class PrometheusPushGatewayMetricReporter extends 
AbstractMetricReporter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayMetricReporter.class);
+  static final String PUSHGATEWAY_ADDR = 
"rss.metrics.prometheus.pushgateway.addr";
+  static final String GROUPING_KEY = 
"rss.metrics.prometheus.pushgateway.groupingkey";
+  static final String JOB_NAME = "rss.metrics.prometheus.pushgateway.jobname";
+  static final String REPORT_INTEVAL = 
"rss.metrics.prometheus.pushgateway.report.interval.seconds";
+  private ScheduledExecutorService scheduledExecutorService;
+  private PushGateway pushGateway;
+
+  public PrometheusPushGatewayMetricReporter(RssConf conf, String instanceId)  
{
+    super(conf, instanceId);
+  }
+
+  @Override
+  public void start() {
+    if (pushGateway == null) {
+      String address = conf.getString(PUSHGATEWAY_ADDR, null);
+      if (StringUtils.isEmpty(address)) {
+        throw new RuntimeException(PUSHGATEWAY_ADDR + " should not be empty!");
+      }
+      pushGateway = new PushGateway(address);
+    }
+    String jobName = conf.getString(JOB_NAME, null);
+    if (StringUtils.isEmpty(jobName)) {
+      throw new RuntimeException(JOB_NAME + " should not be empty!");
+    }
+    Map<String, String> groupingKey = 
parseGroupingKey(conf.getString(GROUPING_KEY, ""));
+    groupingKey.put("instance", instanceId);
+    int reportInterval = conf.getInteger(REPORT_INTEVAL, 10);
+    scheduledExecutorService = Executors.newScheduledThreadPool(1,
+        
ThreadUtils.getThreadFactory("PrometheusPushGatewayMetricReporter-%d"));
+    scheduledExecutorService.scheduleWithFixedDelay(() -> {
+      for (CollectorRegistry registry : registryList) {
+        try {
+          pushGateway.push(registry, jobName, groupingKey);
+        } catch (Throwable e) {
+          LOG.error("Failed to send metrics to push gateway.", e);
+        }
+      }
+    }, 0, reportInterval, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void stop() {
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdownNow();
+    }
+  }
+
+  @VisibleForTesting
+  void setPushGateway(PushGateway pushGateway) {
+    this.pushGateway = pushGateway;
+  }
+
+  static Map<String, String> parseGroupingKey(final String groupingKeyConfig) {
+    Map<String, String> groupingKey = new HashMap<>();
+    if (!groupingKeyConfig.isEmpty()) {
+      String[] kvs = groupingKeyConfig.split(";");
+      for (String kv : kvs) {
+        int idx = kv.indexOf("=");
+        if (idx < 0) {
+          LOG.warn("Invalid prometheusPushGateway groupingKey:{}, will be 
ignored", kv);
+          continue;
+        }
+
+        String labelKey = kv.substring(0, idx);
+        String labelValue = kv.substring(idx + 1);
+        if (StringUtils.isEmpty(labelKey)
+            || StringUtils.isEmpty(labelValue)) {
+          LOG.warn(
+              "Invalid groupingKey {labelKey:{}, labelValue:{}} must not be 
empty",
+              labelKey,
+              labelValue);
+          continue;
+        }
+        groupingKey.put(labelKey, labelValue);
+      }
+
+      return groupingKey;
+    }
+
+    return groupingKey;
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporterTest.java
 
b/common/src/test/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporterTest.java
new file mode 100644
index 00000000..56788b1e
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics.prometheus;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.exporter.PushGateway;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.metrics.MetricReporter;
+import org.apache.uniffle.common.metrics.MetricReporterFactory;
+import org.apache.uniffle.common.metrics.MetricsManager;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PrometheusPushGatewayMetricReporterTest {
+
+  @Test
+  public void testParseGroupingKey() {
+    Map<String, String> groupingKey =
+        PrometheusPushGatewayMetricReporter.parseGroupingKey("k1=v1;k2=v2");
+    assertNotNull(groupingKey);
+    assertEquals("v1", groupingKey.get("k1"));
+    assertEquals("v2", groupingKey.get("k2"));
+  }
+
+  @Test
+  public void testParseIncompleteGroupingKey() {
+    Map<String, String> groupingKey =
+        PrometheusPushGatewayMetricReporter.parseGroupingKey("k1=");
+    assertTrue(groupingKey.isEmpty());
+
+    groupingKey = PrometheusPushGatewayMetricReporter.parseGroupingKey("=v1");
+    assertTrue(groupingKey.isEmpty());
+
+    groupingKey = PrometheusPushGatewayMetricReporter.parseGroupingKey("k1");
+    assertTrue(groupingKey.isEmpty());
+  }
+
+  @Test
+  public void test() throws Exception {
+    RssConf conf = new RssConf();
+    conf.setString(RssBaseConf.RSS_METRICS_REPORTER_CLASS,
+        PrometheusPushGatewayMetricReporter.class.getCanonicalName());
+    conf.setString(PrometheusPushGatewayMetricReporter.PUSHGATEWAY_ADDR, "");
+    conf.setString(PrometheusPushGatewayMetricReporter.GROUPING_KEY, 
"a=1;b=2");
+    String jobName = "jobname";
+    conf.setString(PrometheusPushGatewayMetricReporter.JOB_NAME, jobName);
+    String instanceId = "127.0.0.1-19999";
+    MetricReporter metricReporter = 
MetricReporterFactory.getMetricReporter(conf, instanceId);
+    assertTrue(metricReporter instanceof PrometheusPushGatewayMetricReporter);
+    MetricsManager metricsManager = new MetricsManager();
+    CollectorRegistry collectorRegistry = 
metricsManager.getCollectorRegistry();
+    metricReporter.addCollectorRegistry(collectorRegistry);
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    Counter counter1 = metricsManager.addCounter("counter1");
+    counter1.inc();
+    PushGateway pushGateway = new CustomPushGateway((registry, job, 
groupingKey) -> {
+      countDownLatch.countDown();
+      assertEquals(jobName, job);
+      assertEquals(3, groupingKey.size());
+      assertEquals(instanceId, groupingKey.get("instance"));
+      assertEquals(1, counter1.get());
+    });
+    ((PrometheusPushGatewayMetricReporter) 
metricReporter).setPushGateway(pushGateway);
+    metricReporter.start();
+    countDownLatch.await(20, TimeUnit.SECONDS);
+    metricReporter.stop();
+  }
+
+  class CustomPushGateway extends PushGateway {
+
+    private final CustomCallback<CollectorRegistry, String, Map<String, 
String>> callback;
+
+    CustomPushGateway(CustomCallback<CollectorRegistry, String, Map<String, 
String>> callback) {
+      super("localhost");
+      this.callback = callback;
+    }
+
+    @Override
+    public void push(CollectorRegistry registry, String job, Map<String, 
String> groupingKey) throws IOException {
+      callback.apply(registry, job, groupingKey);
+    }
+  }
+
+  @FunctionalInterface
+  interface CustomCallback<P1, P2, P3> {
+    void apply(P1 p1, P2 p2, P3 p3);
+  }
+}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index f93c4335..e1a126be 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -26,9 +26,12 @@ import picocli.CommandLine;
 import org.apache.uniffle.common.Arguments;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.metrics.JvmMetrics;
+import org.apache.uniffle.common.metrics.MetricReporter;
+import org.apache.uniffle.common.metrics.MetricReporterFactory;
 import org.apache.uniffle.common.rpc.ServerInterface;
 import org.apache.uniffle.common.security.SecurityConfig;
 import org.apache.uniffle.common.security.SecurityContextFactory;
+import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.web.CommonMetricsServlet;
 import org.apache.uniffle.common.web.JettyServer;
 import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
@@ -58,6 +61,8 @@ public class CoordinatorServer {
   private AccessManager accessManager;
   private ApplicationManager applicationManager;
   private GRPCMetrics grpcMetrics;
+  private MetricReporter metricReporter;
+  private String id;
 
   public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception {
     this.coordinatorConf = coordinatorConf;
@@ -117,11 +122,22 @@ public class CoordinatorServer {
     if (clientConfManager != null) {
       clientConfManager.close();
     }
+    if (metricReporter != null) {
+      metricReporter.stop();
+      LOG.info("Metric Reporter Stopped!");
+    }
     SecurityContextFactory.get().getSecurityContext().close();
     server.stop();
   }
 
   private void initialization() throws Exception {
+    String ip = RssUtils.getHostIp();
+    if (ip == null) {
+      throw new RuntimeException("Couldn't acquire host Ip");
+    }
+    int port = coordinatorConf.getInteger(CoordinatorConf.RPC_SERVER_PORT);
+    id = ip + "-" + port;
+    LOG.info("Start to initialize coordinator {}", id);
     jettyServer = new JettyServer(coordinatorConf);
     // register metrics first to avoid NPE problem when add dynamic metrics
     registerMetrics();
@@ -153,7 +169,7 @@ public class CoordinatorServer {
     server = coordinatorFactory.getServer();
   }
 
-  private void registerMetrics() {
+  private void registerMetrics() throws Exception {
     LOG.info("Register metrics");
     CollectorRegistry coordinatorCollectorRegistry = new 
CollectorRegistry(true);
     CoordinatorMetrics.register(coordinatorCollectorRegistry);
@@ -182,6 +198,13 @@ public class CoordinatorServer {
     jettyServer.addServlet(
         new CommonMetricsServlet(JvmMetrics.getCollectorRegistry(), true),
         "/prometheus/metrics/jvm");
+
+    metricReporter = MetricReporterFactory.getMetricReporter(coordinatorConf,  
id);
+    if (metricReporter != null) {
+      
metricReporter.addCollectorRegistry(CoordinatorMetrics.getCollectorRegistry());
+      metricReporter.addCollectorRegistry(grpcMetrics.getCollectorRegistry());
+      metricReporter.addCollectorRegistry(JvmMetrics.getCollectorRegistry());
+    }
   }
 
   public ClusterManager getClusterManager() {
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index da02a579..ddc7f4dd 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -105,6 +105,7 @@ This document will introduce how to deploy Uniffle 
coordinators.
 |rss.coordinator.quota.update.interval|60000|Update interval for the default 
number of submitted apps per user.|
 |rss.coordinator.quota.default.path|-|A configuration file for the number of 
apps for a user-defined user.|
 |rss.coordinator.quota.default.app.num|5|Default number of apps at user level.|
+|rss.metrics.reporter.class|-|The class of metrics reporter.|
 
 ### AccessClusterLoadChecker settings
 |Property Name|Default|        Description|
@@ -118,3 +119,13 @@ AccessCandidatesChecker is one of the built-in access 
checker, which will allow
 |---|---|---|
 |rss.coordinator.access.candidates.updateIntervalSec|120|Accessed candidates 
update interval in seconds, which is only valid when AccessCandidatesChecker is 
enabled.|
 |rss.coordinator.access.candidates.path|-|Accessed candidates file path, the 
file can be stored on HDFS|
+
+### PrometheusPushGatewayMetricReporter settings
+PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, 
which will allow user pushes metrics to a [Prometheus 
Pushgateway](https://github.com/prometheus/pushgateway), which can be scraped 
by Prometheus.
+
+|Property Name|Default|        Description                                     
                                                                                
                                                                                
                                                                                
                                     |
+|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL 
including scheme, host name, and port.                                          
                                                                                
                                                                                
                                                      |
+|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key 
which is the group and global labels of all metrics. The label name and value 
are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. 
Please ensure that your grouping key meets the [Prometheus 
requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels).
 |
+|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which 
metrics will be pushed.                                                         
                                                                                
                                                                                
                                                             |
+|rss.metrics.prometheus.pushgateway.report.interval.seconds|10| The interval 
in seconds for the reporter to report metrics.                                  
                                                                                
                                                                                
                                                                                
   |
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 17cb7875..357aa94d 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -84,4 +84,16 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 |rss.server.disk.capacity|-1|Disk capacity that shuffle server can use. If 
it's negative, it will use the default disk whole space|
 |rss.server.multistorage.fallback.strategy.class|-|The fallback strategy for 
`MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If 
not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy` 
will be used.|
 |rss.server.leak.shuffledata.check.interval|3600000|The interval of leak 
shuffle data check (ms)|
-|rss.server.max.concurrency.of.single.partition.writer|1|The max concurrency 
of single partition writer, the data partition file number is equal to this 
value. Default value is 1. This config could improve the writing speed, 
especially for huge partition.|
\ No newline at end of file
+|rss.server.max.concurrency.of.single.partition.writer|1|The max concurrency 
of single partition writer, the data partition file number is equal to this 
value. Default value is 1. This config could improve the writing speed, 
especially for huge partition.|
+|rss.metrics.reporter.class|-|The class of metrics reporter.|
+
+
+### PrometheusPushGatewayMetricReporter settings
+PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, 
which will allow user pushes metrics to a [Prometheus 
Pushgateway](https://github.com/prometheus/pushgateway), which can be scraped 
by Prometheus.
+
+|Property Name|Default|        Description                                     
                                                                                
                                                                                
                                                                                
                                     |
+|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL 
including scheme, host name, and port.                                          
                                                                                
                                                                                
                                                      |
+|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key 
which is the group and global labels of all metrics. The label name and value 
are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. 
Please ensure that your grouping key meets the [Prometheus 
requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels).
 |
+|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which 
metrics will be pushed.                                                         
                                                                                
                                                                                
                                                             |
+|rss.metrics.prometheus.pushgateway.report.interval.seconds|10| The interval 
in seconds for the reporter to report metrics.                                  
                                                                                
                                                                                
                                                                                
   |
diff --git a/pom.xml b/pom.xml
index dc4f34a6..18189d79 100644
--- a/pom.xml
+++ b/pom.xml
@@ -501,7 +501,11 @@
         <artifactId>simpleclient_servlet</artifactId>
         <version>${prometheus.simpleclient.version}</version>
       </dependency>
-
+      <dependency>
+        <groupId>io.prometheus</groupId>
+        <artifactId>simpleclient_pushgateway</artifactId>
+        <version>${prometheus.simpleclient.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.awaitility</groupId>
         <artifactId>awaitility</artifactId>
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 56e1a68a..e4d09b51 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -34,6 +34,8 @@ import picocli.CommandLine;
 import org.apache.uniffle.common.Arguments;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.metrics.JvmMetrics;
+import org.apache.uniffle.common.metrics.MetricReporter;
+import org.apache.uniffle.common.metrics.MetricReporterFactory;
 import org.apache.uniffle.common.rpc.ServerInterface;
 import org.apache.uniffle.common.security.SecurityConfig;
 import org.apache.uniffle.common.security.SecurityContextFactory;
@@ -75,6 +77,7 @@ public class ShuffleServer {
   private Set<String> tags = Sets.newHashSet();
   private AtomicBoolean isHealthy = new AtomicBoolean(true);
   private GRPCMetrics grpcMetrics;
+  private MetricReporter metricReporter;
 
   public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
     this.shuffleServerConf = shuffleServerConf;
@@ -140,6 +143,10 @@ public class ShuffleServer {
       healthCheck.stop();
       LOG.info("HealthCheck stopped!");
     }
+    if (metricReporter != null) {
+      metricReporter.stop();
+      LOG.info("Metric Reporter Stopped!");
+    }
     SecurityContextFactory.get().getSecurityContext().close();
     server.stop();
     LOG.info("RPC Server Stopped!");
@@ -207,7 +214,7 @@ public class ShuffleServer {
     LOG.info("Server tags: {}", tags);
   }
 
-  private void registerMetrics() {
+  private void registerMetrics() throws Exception {
     LOG.info("Register metrics");
     CollectorRegistry shuffleServerCollectorRegistry = new 
CollectorRegistry(true);
     ShuffleServerMetrics.register(shuffleServerCollectorRegistry);
@@ -236,6 +243,13 @@ public class ShuffleServer {
     jettyServer.addServlet(
         new CommonMetricsServlet(JvmMetrics.getCollectorRegistry(), true),
         "/prometheus/metrics/jvm");
+
+    metricReporter = 
MetricReporterFactory.getMetricReporter(shuffleServerConf, id);
+    if (metricReporter != null) {
+      
metricReporter.addCollectorRegistry(ShuffleServerMetrics.getCollectorRegistry());
+      metricReporter.addCollectorRegistry(grpcMetrics.getCollectorRegistry());
+      metricReporter.addCollectorRegistry(JvmMetrics.getCollectorRegistry());
+    }
   }
 
   /**

Reply via email to