This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 410dcc04 [#796] bug:fix the issues of MetricReporter (#797)
410dcc04 is described below
commit 410dcc04940b1394766fe07a33bb0906a6da0972
Author: xianjingfeng <[email protected]>
AuthorDate: Fri Apr 7 11:12:41 2023 +0800
[#796] bug:fix the issues of MetricReporter (#797)
### What changes were proposed in this pull request?
Support custom config keys defined in plugins
Refactor the logic for load config file
Fix some issues of metricReporter.
### Why are the changes needed?
Metric reporter is unusable.
Fix: #796
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
UT and Manual testing
---
.../apache/uniffle/common/config/RssBaseConf.java | 18 ++++++------
.../org/apache/uniffle/common/config/RssConf.java | 32 ++++++++++++++++++++++
.../common/metrics/MetricReporterFactory.java | 2 +-
.../PrometheusPushGatewayMetricReporter.java | 2 +-
.../common/metrics/MetricReporterFactoryTest.java} | 31 +++++++++++----------
.../uniffle/coordinator/CoordinatorConf.java | 20 +-------------
.../uniffle/coordinator/CoordinatorServer.java | 3 ++
.../uniffle/coordinator/CoordinatorConfTest.java | 2 ++
coordinator/src/test/resources/coordinator.conf | 1 +
docs/coordinator_guide.md | 1 +
docs/server_guide.md | 1 +
.../org/apache/uniffle/server/ShuffleServer.java | 1 +
.../apache/uniffle/server/ShuffleServerConf.java | 26 +-----------------
.../uniffle/server/ShuffleServerConfTest.java | 4 ++-
server/src/test/resources/confTest.conf | 1 +
15 files changed, 74 insertions(+), 71 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 8dd74d34..823f9aa3 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.util.RssUtils;
public class RssBaseConf extends RssConf {
@@ -230,22 +231,19 @@ public class RssBaseConf extends RssConf {
.intType()
.defaultValue(16)
.withDescription("start server service max retry");
+
+ public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>>
configOptions) {
+ Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
- public boolean loadCommonConf(Map<String, String> properties) {
if (properties == null) {
return false;
}
+ return loadCommonConf(properties) && loadConf(properties, configOptions,
true);
+ }
+ public boolean loadCommonConf(Map<String, String> properties) {
List<ConfigOption<Object>> configOptions =
ConfigUtils.getAllConfigOptions(RssBaseConf.class);
- properties.forEach((k, v) -> {
- configOptions.forEach(config -> {
- if (config.key().equalsIgnoreCase(k)) {
- set(config, ConfigUtils.convertValue(v, config.getClazz()));
- }
- });
- });
-
- return true;
+ return loadConf(properties, configOptions, false);
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index f7f832a1..1f9da666 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -18,10 +18,12 @@
package org.apache.uniffle.common.config;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import com.google.common.collect.Sets;
@@ -600,6 +602,36 @@ public class RssConf implements Cloneable {
return getRawValue(configOption.key());
}
+ /**
+ * loadConf
+ * @param properties all config items in configration file
+ * @param configOptions the config items defined in base config class
+ * @param includeMissingKey if include the keys which not defined in base
config class
+ * @return true if load successfully, otherwise false
+ */
+ public boolean loadConf(
+ Map<String, String> properties,
+ List<ConfigOption<Object>> configOptions,
+ boolean includeMissingKey) {
+ if (properties == null || configOptions == null) {
+ return false;
+ }
+ Map<String, ConfigOption<Object>> configOptionMap =
+ configOptions.stream().collect(Collectors.toMap(c ->
c.key().toLowerCase(), c -> c));
+ properties.forEach((k, v) -> {
+ ConfigOption<Object> config = configOptionMap.get(k.toLowerCase());
+ if (config == null) {
+ // if the key is not defined in configOptions, set it as a string value
+ if (includeMissingKey) {
+ setString(k, v);
+ }
+ } else {
+ set(config, ConfigUtils.convertValue(v, config.getClazz()));
+ }
+ });
+ return true;
+ }
+
@Override
public int hashCode() {
int hash = 0;
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
index d66576df..d25a9527 100644
---
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
+++
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
@@ -33,7 +33,7 @@ public class MetricReporterFactory {
}
Class<?> klass = Class.forName(name);
Constructor<?> constructor;
- constructor = klass.getConstructor(conf.getClass(), instanceId.getClass());
+ constructor = klass.getConstructor(RssConf.class, String.class);
return (AbstractMetricReporter) constructor.newInstance(conf, instanceId);
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
index 107298e0..49a79503 100644
---
a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
+++
b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
@@ -68,7 +68,7 @@ public class PrometheusPushGatewayMetricReporter extends
AbstractMetricReporter
scheduledExecutorService.scheduleWithFixedDelay(() -> {
for (CollectorRegistry registry : registryList) {
try {
- pushGateway.push(registry, jobName, groupingKey);
+ pushGateway.pushAdd(registry, jobName, groupingKey);
} catch (Throwable e) {
LOG.error("Failed to send metrics to push gateway.", e);
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
b/common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java
similarity index 58%
copy from
common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
copy to
common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java
index d66576df..e3071a37 100644
---
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java
+++
b/common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java
@@ -17,23 +17,26 @@
package org.apache.uniffle.common.metrics;
-import java.lang.reflect.Constructor;
-
-import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
+import
org.apache.uniffle.common.metrics.prometheus.PrometheusPushGatewayMetricReporter;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MetricReporterFactoryTest {
+
+ @Test
+ public void testGetMetricReporter() throws Exception {
+ CustomRssConf conf = new CustomRssConf();
+ conf.set(RssBaseConf.RSS_METRICS_REPORTER_CLASS,
+ PrometheusPushGatewayMetricReporter.class.getCanonicalName());
+ MetricReporter metricReporter =
MetricReporterFactory.getMetricReporter(conf, "1");
+ assertTrue(metricReporter instanceof PrometheusPushGatewayMetricReporter);
+ }
+
+ class CustomRssConf extends RssConf {
-public class MetricReporterFactory {
-
- public static MetricReporter getMetricReporter(RssConf conf, String
instanceId) throws Exception {
- String name = conf.get(RssBaseConf.RSS_METRICS_REPORTER_CLASS);
- if (StringUtils.isEmpty(name)) {
- return null;
- }
- Class<?> klass = Class.forName(name);
- Constructor<?> constructor;
- constructor = klass.getConstructor(conf.getClass(), instanceId.getClass());
- return (AbstractMetricReporter) constructor.newInstance(conf, instanceId);
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 879dedff..9dfe7d33 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -18,13 +18,11 @@
package org.apache.uniffle.coordinator;
import java.util.List;
-import java.util.Map;
import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.util.RssUtils;
import
org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
import
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
@@ -221,22 +219,6 @@ public class CoordinatorConf extends RssBaseConf {
}
public boolean loadConfFromFile(String fileName) {
- Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
-
- if (properties == null) {
- return false;
- }
-
- loadCommonConf(properties);
-
- List<ConfigOption<Object>> configOptions =
ConfigUtils.getAllConfigOptions(CoordinatorConf.class);
- properties.forEach((k, v) -> {
- configOptions.forEach(config -> {
- if (config.key().equalsIgnoreCase(k)) {
- set(config, ConfigUtils.convertValue(v, config.getClazz()));
- }
- });
- });
- return true;
+ return loadConfFromFile(fileName,
ConfigUtils.getAllConfigOptions(CoordinatorConf.class));
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 7016c692..3fb185f4 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -105,6 +105,9 @@ public class CoordinatorServer extends ReconfigurableBase {
startReconfigureThread();
jettyServer.start();
server.start();
+ if (metricReporter != null) {
+ metricReporter.start();
+ }
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
index 8f93f365..9f6e9dd0 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
@@ -46,6 +46,8 @@ public class CoordinatorConfTest {
assertEquals(256, conf.getInteger(CoordinatorConf.JETTY_CORE_POOL_SIZE));
assertEquals(60 * 1000,
conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL));
+ // test custom keys defined in plugins
+ assertEquals("v1", conf.getString("plugin.custom.key", null));
}
}
diff --git a/coordinator/src/test/resources/coordinator.conf
b/coordinator/src/test/resources/coordinator.conf
index 4c881e64..d597ebb7 100644
--- a/coordinator/src/test/resources/coordinator.conf
+++ b/coordinator/src/test/resources/coordinator.conf
@@ -28,3 +28,4 @@ rss.coordinator.access.candidates.updateIntervalSec 1
rss.coordinator.access.loadChecker.serverNum.threshold 2
rss.coordinator.access.loadChecker.memory.percentage 20.0
rss.coordinator.dynamicClientConf.updateIntervalSec 1
+plugin.custom.key v1
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index d3815a04..ff8662d9 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -132,6 +132,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in
metrics reporter, whi
|Property Name|Default| Description
|
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.<br/>prometheus.PrometheusPushGatewayMetricReporter|The
class of metrics reporter.|
|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL
including scheme, host name, and port.
|
|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key
which is the group and global labels of all metrics. The label name and value
are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2.
Please ensure that your grouping key meets the [Prometheus
requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels).
|
|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which
metrics will be pushed.
|
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 4b3dbedd..0fb7af4e 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -105,6 +105,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in
metrics reporter, whi
|Property Name|Default| Description
|
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.<br/>prometheus.PrometheusPushGatewayMetricReporter|The
class of metrics reporter.|
|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL
including scheme, host name, and port.
|
|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key
which is the group and global labels of all metrics. The label name and value
are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2.
Please ensure that your grouping key meets the [Prometheus
requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels).
|
|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which
metrics will be pushed.
|
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index c7cf9484..e713649a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -302,6 +302,7 @@ public class ShuffleServer {
metricReporter.addCollectorRegistry(ShuffleServerMetrics.getCollectorRegistry());
metricReporter.addCollectorRegistry(grpcMetrics.getCollectorRegistry());
metricReporter.addCollectorRegistry(JvmMetrics.getCollectorRegistry());
+ metricReporter.start();
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 566e4e7a..2f05dc1a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -18,7 +18,6 @@
package org.apache.uniffle.server;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -26,7 +25,6 @@ import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.util.RssUtils;
public class ShuffleServerConf extends RssBaseConf {
@@ -485,28 +483,6 @@ public class ShuffleServerConf extends RssBaseConf {
}
public boolean loadConfFromFile(String fileName) {
- Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
-
- if (properties == null) {
- return false;
- }
-
- loadCommonConf(properties);
-
- List<ConfigOption<Object>> configOptions =
ConfigUtils.getAllConfigOptions(ShuffleServerConf.class);
-
- properties.forEach((k, v) -> {
- configOptions.forEach(config -> {
- if (config.key().equalsIgnoreCase(k)) {
- set(config, ConfigUtils.convertValue(v, config.getClazz()));
- }
- });
-
- if (k.startsWith(PREFIX_HADOOP_CONF)) {
- setString(k, v);
- }
- });
-
- return true;
+ return loadConfFromFile(fileName,
ConfigUtils.getAllConfigOptions(ShuffleServerConf.class));
}
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
index ca707c79..4f9c81b2 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
@@ -61,8 +61,10 @@ public class ShuffleServerConfTest {
assertFalse(shuffleServerConf.loadConfFromFile("/var/tmp/null"));
assertEquals(2,
shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
assertEquals("value1",
shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
- assertEquals("", shuffleServerConf.getString("rss.server.had.a.b", ""));
+ assertEquals("value2", shuffleServerConf.getString("rss.server.had.a.b",
""));
assertEquals("GRPC",
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
+ assertEquals("v1", shuffleServerConf.getString("plugin.custom.key", null));
+
}
@Test
diff --git a/server/src/test/resources/confTest.conf
b/server/src/test/resources/confTest.conf
index 69170b4f..6007fb90 100644
--- a/server/src/test/resources/confTest.conf
+++ b/server/src/test/resources/confTest.conf
@@ -25,3 +25,4 @@ rss.server.hadoop.a.b value1
rss.server.had.a.b value2
rss.server.multistorage.enable true
rss.server.cluster.hadoop.clustere1.
+plugin.custom.key v1