This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.7 by this push:
     new bcb591ef [#796][0.7] bug: Fix the issues of MetricReporter (#821)
bcb591ef is described below

commit bcb591efe3ce1a6ad74d9cde533b6489b2f25db0
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Apr 13 14:58:10 2023 +0800

    [#796][0.7] bug: Fix the issues of MetricReporter (#821)
    
    ### What changes were proposed in this pull request?
    1.Support custom config keys defined in plugins
    2.Refactor the logic for load config file
    3.Fix some issues of metricReporter.
    ### Why are the changes needed?
    Metric reporter is unusable.
    Fix: #796
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    UT and Manual testing
---
 .../apache/uniffle/common/config/RssBaseConf.java  | 17 +++++-------
 .../org/apache/uniffle/common/config/RssConf.java  | 32 ++++++++++++++++++++++
 .../common/metrics/MetricReporterFactory.java      |  2 +-
 .../PrometheusPushGatewayMetricReporter.java       |  2 +-
 .../common/metrics/MetricReporterFactoryTest.java} | 31 +++++++++++----------
 .../uniffle/coordinator/CoordinatorConf.java       | 20 +-------------
 .../uniffle/coordinator/CoordinatorServer.java     |  3 ++
 .../uniffle/coordinator/CoordinatorConfTest.java   |  2 ++
 coordinator/src/test/resources/coordinator.conf    |  1 +
 docs/coordinator_guide.md                          |  1 +
 docs/server_guide.md                               |  1 +
 .../org/apache/uniffle/server/ShuffleServer.java   |  3 ++
 .../apache/uniffle/server/ShuffleServerConf.java   | 26 +-----------------
 .../uniffle/server/ShuffleServerConfTest.java      |  5 ++--
 server/src/test/resources/confTest.conf            |  1 +
 15 files changed, 75 insertions(+), 72 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 761e5bf3..ae6bce0d 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.util.RssUtils;
 
 public class RssBaseConf extends RssConf {
 
@@ -212,21 +213,17 @@ public class RssBaseConf extends RssConf {
       .defaultValue(5L)
       .withDescription("Reconfigure check interval.");
 
-  public boolean loadCommonConf(Map<String, String> properties) {
+  public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>> 
configOptions) {
+    Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
     if (properties == null) {
       return false;
     }
+    return loadCommonConf(properties) && loadConf(properties, configOptions, 
true);
+  }
 
+  public boolean loadCommonConf(Map<String, String> properties) {
     List<ConfigOption<Object>> configOptions = 
ConfigUtils.getAllConfigOptions(RssBaseConf.class);
-    properties.forEach((k, v) -> {
-      configOptions.forEach(config -> {
-        if (config.key().equalsIgnoreCase(k)) {
-          set(config, ConfigUtils.convertValue(v, config.getClazz()));
-        }
-      });
-    });
-
-    return true;
+    return loadConf(properties, configOptions, false);
   }
 
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index f7f832a1..1f9da666 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -18,10 +18,12 @@
 package org.apache.uniffle.common.config;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
 
@@ -600,6 +602,36 @@ public class RssConf implements Cloneable {
     return getRawValue(configOption.key());
   }
 
+  /**
+   * loadConf
+   * @param properties all config items in configration file
+   * @param configOptions the config items defined in base config class
+   * @param includeMissingKey if include the keys which not defined in base 
config class
+   * @return true if load successfully, otherwise false
+   */
+  public boolean loadConf(
+      Map<String, String> properties,
+      List<ConfigOption<Object>> configOptions,
+      boolean includeMissingKey) {
+    if (properties == null || configOptions == null) {
+      return false;
+    }
+    Map<String, ConfigOption<Object>> configOptionMap =
+        configOptions.stream().collect(Collectors.toMap(c -> 
c.key().toLowerCase(), c -> c));
+    properties.forEach((k, v) -> {
+      ConfigOption<Object> config = configOptionMap.get(k.toLowerCase());
+      if (config == null) {
+        // if the key is not defined in configOptions, set it as a string value
+        if (includeMissingKey) {
+          setString(k, v);
+        }
+      } else {
+        set(config, ConfigUtils.convertValue(v, config.getClazz()));
+      }
+    });
+    return true;
+  }
+
   @Override
   public int hashCode() {
     int hash = 0;
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
 
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
index d66576df..d25a9527 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
@@ -33,7 +33,7 @@ public class MetricReporterFactory {
     }
     Class<?> klass = Class.forName(name);
     Constructor<?> constructor;
-    constructor = klass.getConstructor(conf.getClass(), instanceId.getClass());
+    constructor = klass.getConstructor(RssConf.class, String.class);
     return (AbstractMetricReporter) constructor.newInstance(conf, instanceId);
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
 
b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
index 8795d5b6..1b55c886 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
@@ -68,7 +68,7 @@ public class PrometheusPushGatewayMetricReporter extends 
AbstractMetricReporter
     scheduledExecutorService.scheduleWithFixedDelay(() -> {
       for (CollectorRegistry registry : registryList) {
         try {
-          pushGateway.push(registry, jobName, groupingKey);
+          pushGateway.pushAdd(registry, jobName, groupingKey);
         } catch (Throwable e) {
           LOG.error("Failed to send metrics to push gateway.", e);
         }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
 
b/common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java
similarity index 58%
copy from 
common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
copy to 
common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java
index d66576df..e3071a37 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java
@@ -17,23 +17,26 @@
 
 package org.apache.uniffle.common.metrics;
 
-import java.lang.reflect.Constructor;
-
-import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.config.RssConf;
+import 
org.apache.uniffle.common.metrics.prometheus.PrometheusPushGatewayMetricReporter;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MetricReporterFactoryTest {
+
+  @Test
+  public void testGetMetricReporter() throws Exception {
+    CustomRssConf conf = new CustomRssConf();
+    conf.set(RssBaseConf.RSS_METRICS_REPORTER_CLASS,
+        PrometheusPushGatewayMetricReporter.class.getCanonicalName());
+    MetricReporter metricReporter = 
MetricReporterFactory.getMetricReporter(conf, "1");
+    assertTrue(metricReporter instanceof PrometheusPushGatewayMetricReporter);
+  }
+
+  class CustomRssConf extends RssConf {
 
-public class MetricReporterFactory {
-
-  public static MetricReporter getMetricReporter(RssConf conf, String 
instanceId) throws Exception {
-    String name = conf.get(RssBaseConf.RSS_METRICS_REPORTER_CLASS);
-    if (StringUtils.isEmpty(name)) {
-      return null;
-    }
-    Class<?> klass = Class.forName(name);
-    Constructor<?> constructor;
-    constructor = klass.getConstructor(conf.getClass(), instanceId.getClass());
-    return (AbstractMetricReporter) constructor.newInstance(conf, instanceId);
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 6f2b9413..424d7d1e 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -18,13 +18,11 @@
 package org.apache.uniffle.coordinator;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.uniffle.common.config.ConfigOption;
 import org.apache.uniffle.common.config.ConfigOptions;
 import org.apache.uniffle.common.config.ConfigUtils;
 import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.util.RssUtils;
 import 
org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
 import 
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
 
@@ -209,22 +207,6 @@ public class CoordinatorConf extends RssBaseConf {
   }
 
   public boolean loadConfFromFile(String fileName) {
-    Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
-
-    if (properties == null) {
-      return false;
-    }
-
-    loadCommonConf(properties);
-
-    List<ConfigOption<Object>> configOptions = 
ConfigUtils.getAllConfigOptions(CoordinatorConf.class);
-    properties.forEach((k, v) -> {
-      configOptions.forEach(config -> {
-        if (config.key().equalsIgnoreCase(k)) {
-          set(config, ConfigUtils.convertValue(v, config.getClazz()));
-        }
-      });
-    });
-    return true;
+    return loadConfFromFile(fileName, 
ConfigUtils.getAllConfigOptions(CoordinatorConf.class));
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index a016df6a..fa648983 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -101,6 +101,9 @@ public class CoordinatorServer extends ReconfigurableBase {
     startReconfigureThread();
     jettyServer.start();
     server.start();
+    if (metricReporter != null) {
+      metricReporter.start();
+    }
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
       @Override
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
index 3d06f79f..0ece6b82 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
@@ -46,6 +46,8 @@ public class CoordinatorConfTest {
     assertEquals(256, conf.getInteger(CoordinatorConf.JETTY_CORE_POOL_SIZE));
     assertEquals(60 * 1000, 
conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL));
 
+    // test custom keys defined in plugins
+    assertEquals("v1", conf.getString("plugin.custom.key", null));
   }
 
 }
diff --git a/coordinator/src/test/resources/coordinator.conf 
b/coordinator/src/test/resources/coordinator.conf
index a29bfdfb..82e6889c 100644
--- a/coordinator/src/test/resources/coordinator.conf
+++ b/coordinator/src/test/resources/coordinator.conf
@@ -28,3 +28,4 @@ rss.coordinator.access.candidates.updateIntervalSec 1
 rss.coordinator.access.loadChecker.serverNum.threshold 2
 rss.coordinator.access.loadChecker.memory.percentage 20.0
 rss.coordinator.dynamicClientConf.updateIntervalSec 1
+plugin.custom.key v1
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index d53a5b37..dc7f1975 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -126,6 +126,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in 
metrics reporter, whi
 
 |Property Name|Default|        Description                                     
                                                                                
                                                                                
                                                                                
                                     |
 
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.<br/>prometheus.PrometheusPushGatewayMetricReporter|The
 class of metrics reporter.|
 |rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL 
including scheme, host name, and port.                                          
                                                                                
                                                                                
                                                      |
 |rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key 
which is the group and global labels of all metrics. The label name and value 
are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. 
Please ensure that your grouping key meets the [Prometheus 
requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels).
 |
 |rss.metrics.prometheus.pushgateway.jobname|-| The job name under which 
metrics will be pushed.                                                         
                                                                                
                                                                                
                                                             |
diff --git a/docs/server_guide.md b/docs/server_guide.md
index e3f9a99c..11f6782d 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -98,6 +98,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in 
metrics reporter, whi
 
 |Property Name|Default|        Description                                     
                                                                                
                                                                                
                                                                                
                                     |
 
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.<br/>prometheus.PrometheusPushGatewayMetricReporter|The
 class of metrics reporter.|
 |rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL 
including scheme, host name, and port.                                          
                                                                                
                                                                                
                                                      |
 |rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key 
which is the group and global labels of all metrics. The label name and value 
are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. 
Please ensure that your grouping key meets the [Prometheus 
requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels).
 |
 |rss.metrics.prometheus.pushgateway.jobname|-| The job name under which 
metrics will be pushed.                                                         
                                                                                
                                                                                
                                                             |
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 796ba305..e1058dd8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -110,6 +110,9 @@ public class ShuffleServer {
     registerHeartBeat.startHeartBeat();
     jettyServer.start();
     server.start();
+    if (metricReporter != null) {
+      metricReporter.start();
+    }
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
       @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 5b181075..fc2f8024 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -18,7 +18,6 @@
 package org.apache.uniffle.server;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -26,7 +25,6 @@ import org.apache.uniffle.common.config.ConfigOption;
 import org.apache.uniffle.common.config.ConfigOptions;
 import org.apache.uniffle.common.config.ConfigUtils;
 import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.util.RssUtils;
 
 public class ShuffleServerConf extends RssBaseConf {
 
@@ -380,28 +378,6 @@ public class ShuffleServerConf extends RssBaseConf {
   }
 
   public boolean loadConfFromFile(String fileName) {
-    Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
-
-    if (properties == null) {
-      return false;
-    }
-
-    loadCommonConf(properties);
-
-    List<ConfigOption<Object>> configOptions = 
ConfigUtils.getAllConfigOptions(ShuffleServerConf.class);
-
-    properties.forEach((k, v) -> {
-      configOptions.forEach(config -> {
-        if (config.key().equalsIgnoreCase(k)) {
-          set(config, ConfigUtils.convertValue(v, config.getClazz()));
-        }
-      });
-
-      if (k.startsWith(PREFIX_HADOOP_CONF)) {
-        setString(k, v);
-      }
-    });
-
-    return true;
+    return loadConfFromFile(fileName, 
ConfigUtils.getAllConfigOptions(ShuffleServerConf.class));
   }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
index f70ca342..07f180a5 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
@@ -61,8 +61,9 @@ public class ShuffleServerConfTest {
     assertFalse(shuffleServerConf.loadConfFromFile("/var/tmp/null"));
     assertEquals(2, 
shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
     assertEquals("value1", 
shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
-    assertEquals("", shuffleServerConf.getString("rss.server.had.a.b", ""));
-    assertEquals("GRPC", 
shuffleServerConf.getString(ShuffleServerConf.RPC_SERVER_TYPE));
+    assertEquals("value2", shuffleServerConf.getString("rss.server.had.a.b", 
""));
+    assertEquals("GRPC", 
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE));
+    assertEquals("v1", shuffleServerConf.getString("plugin.custom.key", null));
   }
 
   @Test
diff --git a/server/src/test/resources/confTest.conf 
b/server/src/test/resources/confTest.conf
index 69170b4f..6007fb90 100644
--- a/server/src/test/resources/confTest.conf
+++ b/server/src/test/resources/confTest.conf
@@ -25,3 +25,4 @@ rss.server.hadoop.a.b value1
 rss.server.had.a.b value2
 rss.server.multistorage.enable true
 rss.server.cluster.hadoop.clustere1.
+plugin.custom.key v1

Reply via email to