Yes, you would have to wrap the Spark JmxSink the same way it’s done for the 
CSV or Graphite one, see [1].
This is necessary to expose Gauges provided by Beam to the sinks.

However, if you are on Spark 3 it’s possible to use the new plugin framework of 
Spark (see). That’s something I was planning to work on but haven’t found time 
yet. Using a driver plugin, respective gauges could be registered in Spark’s 
internal metrics registry (available from PluginContext [3]) without using 
custom sinks.

Btw, all of this is happening on the driver. So actually, there’s no trouble to 
expect with the executor classpath.

/ Moritz


[1] 
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java#L35
[2] 
https://spark.apache.org/docs/3.1.2/api/java/org/apache/spark/api/plugin/SparkPlugin.html
[3] 
https://spark.apache.org/docs/3.1.2/api/java/org/apache/spark/api/plugin/PluginContext.html#metricRegistry—


On 15.07.22, 07:47, "Yushu Yao" <yao.yu...@gmail.com> wrote:

Thanks Mortiz! We are using jmx to ship the metrics out of spark.⁠​ Most of the 
spark built-in driver and executor metrics are going out fine.⁠​ Does this 
require us to make another sink? -Yushu On Thu, Jul 14, 2022 at 1:⁠​05 PM 
Moritz Mack <
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Thanks Mortiz!

We are using jmx to ship the metrics out of spark. Most of the spark built-in 
driver and executor metrics are going out fine.
Does this require us to make another sink?

-Yushu


On Thu, Jul 14, 2022 at 1:05 PM Moritz Mack 
<mm...@talend.com<mailto:mm...@talend.com>> wrote:
Hi Yushu,

Wondering, how did you configure your Spark metrics sink? And what version of 
Spark are you using?

Key is to configure Spark to use one of the sinks provided by Beam, e.g.:
"spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink"

Currently there’s support for CSV and Graphite, but it’s simple to support 
others. The sinks typically wrap the corresponding Spark sinks and integrate 
with the Metrics registry of the Spark metric system.

https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html<https://urldefense.com/v3/__https:/beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html__;!!CiXD_PY!Q0RNO7g4itIWf5gyr87zTB4bPHQ8BhgVB5n3lWkvl1MUyhI0H63K5OPMEabBlMykhd5cmyMwOBZcark$>

When dealing with custom sinks, it’s fairly common to run into classpath issues 
:/ The metric system is loaded when an executor starts up, by then the 
application classpath isn’t available yet. Typically, that means distributing 
such code by other means.

/Moritz


On 14.07.22, 21:26, "Yushu Yao" 
<yao.yu...@gmail.com<mailto:yao.yu...@gmail.com>> wrote:

Hi Team,  Does anyone have a working example of a beam job running on top of 
spark? So that I can use the beam metric syntax and the metrics will be shipped 
out via spark's infra?  The only thing I achieved is to be able to 
queryMetrics()
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Team,
Does anyone have a working example of a beam job running on top of spark? So 
that I can use the beam metric syntax and the metrics will be shipped out via 
spark's infra?

The only thing I achieved is to be able to queryMetrics() every half second and 
copy all the metrics into the spark metrics.
Wondering if there is a better way?

Thanks!
-Yushu


MetricQueryResults metrics =
    pipelineResult
        .metrics()
        .queryMetrics(
            MetricsFilter.builder()
                .addNameFilter(MetricNameFilter.inNamespace(namespace))
                .build());

for (MetricResult<GaugeResult> cc : metrics.getGauges()) {
    LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
    com.codahale.metrics.Gauge gauge =
        new SimpleBeamGauge(cc.getAttempted().getValue());
    try {
        String name = metricName(cc.getKey(), addStepName, addNamespace);
        if (registry.getNames().contains(name)) {
            LOGGER.info("Removing metric {}", name);
            registry.remove(name);
        }
        registry.register(name, gauge);
    } catch (IllegalArgumentException e) {
        LOGGER.warn("Duplicated metrics found. Try turning on 
addStepName=true.", e);
    }
}

As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


Reply via email to