This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new d1d6d65c8 [CELEBORN-2045] Add logger sinks to allow persist metrics
data and avoid possible worker OOM
d1d6d65c8 is described below
commit d1d6d65c8aefe1bfe6e212261f9b18e21105f076
Author: mingji <[email protected]>
AuthorDate: Thu Jun 26 18:42:20 2025 -0700
[CELEBORN-2045] Add logger sinks to allow persist metrics data and avoid
possible worker OOM
### What changes were proposed in this pull request?
1. Add a new sink and allow the user to store metrics to files.
2. Celeborn will scrape its metrics periodically to make sure that the
metric data won't be too large to cause OOM.
### Why are the changes needed?
A long-running worker ran out of memory and found out that the metrics are
huge in the heap dump.
As you can see below, the biggest object is the time metric queue, and I
got 1.6 million records.
<img width="1516" alt="Screenshot 2025-06-24 at 09 59 30"
src="https://github.com/user-attachments/assets/691c7bc2-b974-4cc0-8d5a-bf626ab903c0"
/>
<img width="1239" alt="Screenshot 2025-06-24 at 14 45 10"
src="https://github.com/user-attachments/assets/ebdf5a4d-c941-4f1e-911f-647aa156b37a"
/>
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster.
Closes #3346 from FMX/b2045.
Lead-authored-by: mingji <[email protected]>
Co-authored-by: Ethan Feng <[email protected]>
Co-authored-by: Fei Wang <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 7a0eee332a549d3ca272470decd37169c19f25b6)
Signed-off-by: Wang, Fei <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 24 +++++
.../common/metrics/source/AbstractSource.scala | 2 +
conf/log4j2.xml.template | 20 ++++
conf/metrics.properties.template | 1 +
docs/configuration/metrics.md | 2 +
docs/migration.md | 7 +-
docs/monitoring.md | 7 ++
.../celeborn/common/metrics/MetricsSystem.scala | 9 +-
.../celeborn/common/metrics/sink/LoggerSink.scala | 62 ++++++++++++
.../src/test/resources/metrics2.properties | 2 +-
.../common/metrics/sink/LoggerSinkSuite.scala | 107 +++++++++++++++++++++
11 files changed, 240 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 10bce7768..20f6d09cd 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -900,6 +900,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)
def metricsWorkerAppLevelEnabled: Boolean =
get(METRICS_WORKER_APP_LEVEL_ENABLED)
+ def metricsLoggerSinkScrapeInterval: Long =
get(METRICS_LOGGERSINK_SCRAPE_INTERVAL)
+ def metricsLoggerSinkScrapeOutputEnabled: Boolean =
get(METRICS_LOGGERSINK_SCRAPE_OUTPUT_ENABLED)
// //////////////////////////////////////////////////////
// Quota //
@@ -5637,6 +5639,28 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)
+ val METRICS_LOGGERSINK_SCRAPE_INTERVAL: ConfigEntry[Long] =
+ buildConf("celeborn.metrics.loggerSink.scrape.interval")
+ .categories("metrics")
+ .version("0.6.0")
+ .doc("The interval of logger sink to scrape its own metrics. " +
+ "This config will have effect if you enabled logger sink. " +
+ "If you will not scrape metrics periodically, " +
+ "do add `org.apache.celeborn.common.metrics.sink.LoggerSink` to
metrics.properties.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("30min")
+
+ val METRICS_LOGGERSINK_SCRAPE_OUTPUT_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.metrics.loggerSink.output.enabled")
+ .categories("metrics")
+ .version("0.6.0")
+ .doc("Whether to output scraped metrics to the logger. " +
+ "This config will have effect if you enabled logger sink." +
+ "If you will not scrape metrics periodically," +
+ " do add `org.apache.celeborn.common.metrics.sink.LoggerSink` to
metrics.properties.")
+ .booleanConf
+ .createWithDefault(false)
+
val IDENTITY_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.identity.provider")
.withAlternative("celeborn.quota.identity.provider")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 705344381..c5407315a 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -627,6 +627,8 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
sum
}
+ // Do use this method to get metrics, because it will clear the timeMetrics
queue
+ // Do not use the LogReporter to report metrics
override def getMetrics: String = {
var leftMetricsNum = metricsCapacity
val sb = new mutable.StringBuilder
diff --git a/conf/log4j2.xml.template b/conf/log4j2.xml.template
index 22048d582..284d92120 100644
--- a/conf/log4j2.xml.template
+++ b/conf/log4j2.xml.template
@@ -83,6 +83,23 @@
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
+ <RollingRandomAccessFile name="metricsAuditFile"
fileName="${env:CELEBORN_LOG_DIR}/audit/metrics-audit.log"
+
filePattern="${env:CELEBORN_LOG_DIR}/audit/metrics-audit.log.%d-%i">
+ <PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}:
%m%n%ex"/>
+ <Policies>
+ <SizeBasedTriggeringPolicy size="200 MB"/>
+ </Policies>
+ <DefaultRolloverStrategy max="7">
+ <Delete basePath="${env:CELEBORN_LOG_DIR}/audit" maxDepth="1">
+ <IfFileName glob="metrics-audit.log*">
+ <IfAny>
+ <IfAccumulatedFileSize exceeds="1 GB"/>
+ <IfAccumulatedFileCount exceeds="10"/>
+ </IfAny>
+ </IfFileName>
+ </Delete>
+ </DefaultRolloverStrategy>
+ </RollingRandomAccessFile>
</Appenders>
<Loggers>
@@ -107,5 +124,8 @@
<Logger
name="org.apache.celeborn.service.deploy.master.audit.ShuffleAuditLogger"
level="INFO" additivity="false">
<Appender-ref ref="shuffleAuditFile" level="INFO"/>
</Logger>
+ <Logger name="org.apache.celeborn.common.metrics.sink.LoggerSink"
level="INFO" additivity="false">
+ <Appender-ref ref="metricsAuditFile" level="INFO"/>
+ </Logger>
</Loggers>
</Configuration>
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 56ae280cf..e3b521369 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -17,3 +17,4 @@
*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet
+*.sink.loggerSink.class=org.apache.celeborn.common.metrics.sink.LoggerSink
diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md
index 77c7d6d59..148c49f07 100644
--- a/docs/configuration/metrics.md
+++ b/docs/configuration/metrics.md
@@ -26,6 +26,8 @@ license: |
| celeborn.metrics.extraLabels | | false | If default metric labels are not
enough, extra metric labels can be customized. Labels' pattern is:
`<label1_key>=<label1_value>[,<label2_key>=<label2_value>]*`; e.g.
`env=prod,version=1` | 0.3.0 | |
| celeborn.metrics.json.path | /metrics/json | false | URI context path of
json metrics HTTP server. | 0.4.0 | |
| celeborn.metrics.json.pretty.enabled | true | false | When true, view
metrics in json pretty format | 0.4.0 | |
+| celeborn.metrics.loggerSink.output.enabled | false | false | Whether to
output scraped metrics to the logger. This config will have effect if you
enabled logger sink.If you will not scrape metrics periodically, do add
`org.apache.celeborn.common.metrics.sink.LoggerSink` to metrics.properties. |
0.6.0 | |
+| celeborn.metrics.loggerSink.scrape.interval | 30min | false | The interval
of logger sink to scrape its own metrics. This config will have effect if you
enabled logger sink. If you will not scrape metrics periodically, do add
`org.apache.celeborn.common.metrics.sink.LoggerSink` to metrics.properties. |
0.6.0 | |
| celeborn.metrics.prometheus.path | /metrics/prometheus | false | URI context
path of prometheus metrics HTTP server. | 0.4.0 | |
| celeborn.metrics.sample.rate | 1.0 | false | It controls if Celeborn collect
timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 |
|
| celeborn.metrics.timer.slidingWindow.size | 4096 | false | The sliding
window size of timer metric. | 0.2.0 | |
diff --git a/docs/migration.md b/docs/migration.md
index 85121b581..37ca3571b 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -100,9 +100,14 @@ license: |
- Since 0.6.0, the client respects the spark.celeborn.storage.availableTypes
configuration,
ensuring revived partition locations no longer default to memory storage.
In contrast, clients prior
to 0.6.0 default to memory storage for revived partitions. This means that
if memory storage is enabled in
- worker nodes, pre-0.6.0 clients may inadvertently utilize memory storage
for an application even when memory
+ worker nodes, clients prior to 0.6.0 may inadvertently utilize memory
storage for an application even when memory
storage is not enabled for that app.
+- Since 0.6.0, we have added a new sink
`org.apache.celeborn.common.metrics.sink.LoggerSink` to make sure that Celeborn
+ metrics will be scraped periodically. It's recommended to enable this sink
to make sure that worker's metrics data won't
+ be too large to cause worker OOM if you don't have a collector to scrape
metrics periodically. Don't forget to update
+ the `metrics.properties`.
+
## Upgrading from 0.5.0 to 0.5.1
- Since 0.5.1, Celeborn master REST API `/exclude` request uses media type
`application/x-www-form-urlencoded` instead of `text/plain`.
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 61fa2670a..89558cb42 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -45,7 +45,12 @@ Each instance can report to zero or more _sinks_. Sinks are
contained in the
* `CSVSink`: Exports metrics data to CSV files at regular intervals.
* `PrometheusServlet`: Adds a servlet within the existing Celeborn REST API to
serve metrics data in Prometheus format.
+* `JsonServlet`: Adds a servlet within the existing Celeborn REST API to serve
metrics data in JSON format.
* `GraphiteSink`: Sends metrics to a Graphite node.
+* `LoggerSink`: Scrape metrics periodically and output them to the logger
files if you have enabled
+ `celeborn.metrics.loggerSink.output.enabled`. This is used as safety valve
to make sure the
+ metrics data won't exist in the memory for a long time. If you don't have a
metrics collector to
+ collect metrics from celeborn periodically, it's important to enable this
sink.
The syntax of the metrics configuration file and the parameters available for
each sink are defined
in an example configuration file,
@@ -66,6 +71,8 @@ This example shows a list of Celeborn configuration
parameters for a CSV sink:
Default values of the Celeborn metrics configuration are as follows:
```
*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
+*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet
+*.sink.loggerSink.class=org.apache.celeborn.common.metrics.sink.LoggerSink
```
Additional sources can be configured using the metrics configuration file or
the configuration
diff --git
a/service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
b/service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
index 3baab6ebf..ba21bb788 100644
---
a/service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
+++
b/service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
+import com.google.common.annotations.VisibleForTesting
import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.celeborn.common.CelebornConf
@@ -39,7 +40,8 @@ class MetricsSystem(
conf: CelebornConf) extends Logging {
private[this] val metricsConfig = new MetricsConfig(conf)
- private val sinks = new ArrayBuffer[Sink]
+ @VisibleForTesting
+ val sinks = new ArrayBuffer[Sink]
private val sources = new CopyOnWriteArrayList[Source]
private val registry = new MetricRegistry()
private val prometheusServletPath = conf.get(METRICS_PROMETHEUS_PATH)
@@ -156,6 +158,11 @@ class MetricsSystem(
sources.asScala.toSeq,
jsonServletPath,
conf.metricsJsonPrettyEnabled.asInstanceOf[Object]).asInstanceOf[JsonServlet])
+ } else if (kv._1 == "loggerSink") {
+ val sink = Utils.classForName(classPath)
+ .getConstructor(classOf[Seq[Source]], classOf[CelebornConf])
+ .newInstance(sources.asScala.toSeq, conf)
+ sinks += sink.asInstanceOf[Sink]
} else {
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
diff --git
a/service/src/main/scala/org/apache/celeborn/common/metrics/sink/LoggerSink.scala
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/LoggerSink.scala
new file mode 100644
index 000000000..1b8c45f40
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/LoggerSink.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.metrics.sink
+
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.metrics.source.Source
+import org.apache.celeborn.common.util.ThreadUtils
+
+/**
+ * This sink is not follow the strandard sink interface. It has the duty to
clean internal state.
+ * @param sources
+ * @param conf
+ */
+class LoggerSink(sources: Seq[Source], conf: CelebornConf) extends Sink with
Logging {
+ val metricsLoggerSinkScrapeOutputEnabled =
conf.metricsLoggerSinkScrapeOutputEnabled
+ val metricsLoggerSinkScrapeInterval = conf.metricsLoggerSinkScrapeInterval
+ val metricScrapeThread: ScheduledExecutorService =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"metrics-scrape-thread")
+ override def start(): Unit = {
+ metricScrapeThread.scheduleWithFixedDelay(
+ new Runnable {
+ override def run(): Unit = {
+ sources.foreach { source =>
+ // The method `source.getMetrics` will clear `timeMetric` queue.
+ // This is essential because the queue can be large enough
+ // to cause the worker run out of memory
+ val metricsData = source.getMetrics
+ if (metricsLoggerSinkScrapeOutputEnabled) {
+ logInfo(s"Source ${source.sourceName} scraped metrics:
${metricsData}")
+ }
+ }
+ }
+ },
+ metricsLoggerSinkScrapeInterval,
+ metricsLoggerSinkScrapeInterval,
+ TimeUnit.MILLISECONDS)
+ }
+
+ override def stop(): Unit = {
+ ThreadUtils.shutdown(metricScrapeThread)
+ }
+
+ override def report(): Unit = {}
+}
diff --git a/conf/metrics.properties.template
b/service/src/test/resources/metrics2.properties
similarity index 92%
copy from conf/metrics.properties.template
copy to service/src/test/resources/metrics2.properties
index 56ae280cf..e4ab7394b 100644
--- a/conf/metrics.properties.template
+++ b/service/src/test/resources/metrics2.properties
@@ -14,6 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet
+*.sink.loggerSink.class=org.apache.celeborn.common.metrics.sink.LoggerSink
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/metrics/sink/LoggerSinkSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/metrics/sink/LoggerSinkSuite.scala
new file mode 100644
index 000000000..cb5859be8
--- /dev/null
+++
b/service/src/test/scala/org/apache/celeborn/server/common/metrics/sink/LoggerSinkSuite.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.metrics.sink
+
+import org.apache.logging.log4j.message.SimpleMessage
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.metrics.MetricsSystem
+import org.apache.celeborn.common.metrics.sink.LoggerSink
+import org.apache.celeborn.common.metrics.source.JVMSource
+import org.apache.celeborn.common.network.TestHelper
+
+class LoggerSinkSuite extends CelebornFunSuite {
+ test("test load logger sink case") {
+ val celebornConf = new CelebornConf()
+ celebornConf
+ .set(CelebornConf.METRICS_ENABLED.key, "true")
+ .set(
+ CelebornConf.METRICS_CONF.key,
+ TestHelper.getResourceAsAbsolutePath("/metrics2.properties"))
+ val metricsSystem = MetricsSystem.createMetricsSystem("test", celebornConf)
+ metricsSystem.registerSource(new JVMSource(celebornConf, "test"))
+ metricsSystem.start(true)
+
+ var hasLoggerSink = false
+ metricsSystem.sinks.foreach { sink =>
+ sink.isInstanceOf[LoggerSink] match {
+ case true =>
+ hasLoggerSink = true
+ case false =>
+ }
+ }
+
+ metricsSystem.stop()
+
+ assert(hasLoggerSink)
+ }
+
+ test("test logger sink configs case") {
+ val celebornConf = new CelebornConf()
+ celebornConf
+ .set(CelebornConf.METRICS_ENABLED.key, "true")
+ .set(
+ CelebornConf.METRICS_CONF.key,
+ TestHelper.getResourceAsAbsolutePath("/metrics2.properties"))
+ celebornConf.set("celeborn.metrics.loggerSink.scrape.interval", "10s")
+ celebornConf.set("celeborn.metrics.loggerSink.output.enabled", "true")
+ val metricsSystem = MetricsSystem.createMetricsSystem("test", celebornConf)
+ metricsSystem.registerSource(new JVMSource(celebornConf, "test"))
+ metricsSystem.start(true)
+
+ metricsSystem.sinks.foreach { sink =>
+ sink.isInstanceOf[LoggerSink] match {
+ case true =>
+ val loggerSink = sink.asInstanceOf[LoggerSink]
+ assert(loggerSink.metricsLoggerSinkScrapeOutputEnabled == true)
+ assert(loggerSink.metricsLoggerSinkScrapeInterval == 10000)
+ case false =>
+ }
+ }
+
+ metricsSystem.stop()
+ }
+
+ test("test logger sink validity case") {
+ val celebornConf = new CelebornConf()
+ celebornConf
+ .set(CelebornConf.METRICS_ENABLED.key, "true")
+ .set(
+ CelebornConf.METRICS_CONF.key,
+ TestHelper.getResourceAsAbsolutePath("/metrics2.properties"))
+ celebornConf.set("celeborn.metrics.loggerSink.scrape.interval", "3s")
+ celebornConf.set("celeborn.metrics.loggerSink.output.enabled", "true")
+ val metricsSystem = MetricsSystem.createMetricsSystem("test", celebornConf)
+ val jvmSource = new JVMSource(celebornConf, "test")
+ metricsSystem.registerSource(jvmSource)
+ metricsSystem.start(true)
+
+ jvmSource.timerMetrics.add("test1")
+ jvmSource.timerMetrics.add("test2")
+ jvmSource.timerMetrics.add("test3")
+ jvmSource.timerMetrics.add("test4")
+ jvmSource.timerMetrics.add("test5")
+ Thread.sleep(100)
+ assert(jvmSource.timerMetrics.size() != 0)
+ Thread.sleep(10000)
+ metricsSystem.stop()
+
+ assert(jvmSource.timerMetrics.size() == 0)
+ }
+}