Hi Yushu,

Wondering, how did you configure your Spark metrics sink? And what version of 
Spark are you using?

Key is to configure Spark to use one of the sinks provided by Beam, e.g.:
"spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink"

Currently there’s support for CSV and Graphite, but it’s simple to support 
others. The sinks typically wrap the corresponding Spark sinks and integrate 
with the Metrics registry of the Spark metric system.

https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html

When dealing with custom sinks, it’s fairly common to run into classpath issues 
:/ The metric system is loaded when an executor starts up, by then the 
application classpath isn’t available yet. Typically, that means distributing 
such code by other means.

/Moritz


On 14.07.22, 21:26, "Yushu Yao" <yao.yu...@gmail.com> wrote:

Hi Team,  Does anyone have a working example of a beam job running on top of 
spark? So that I can use the beam metric syntax and the metrics will be shipped 
out via spark's infra?  The only thing I achieved is to be able to 
queryMetrics()
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Team,
Does anyone have a working example of a beam job running on top of spark? So 
that I can use the beam metric syntax and the metrics will be shipped out via 
spark's infra?

The only thing I achieved is to be able to queryMetrics() every half second and 
copy all the metrics into the spark metrics.
Wondering if there is a better way?

Thanks!
-Yushu


MetricQueryResults metrics =
    pipelineResult
        .metrics()
        .queryMetrics(
            MetricsFilter.builder()
                .addNameFilter(MetricNameFilter.inNamespace(namespace))
                .build());

for (MetricResult<GaugeResult> cc : metrics.getGauges()) {
    LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
    com.codahale.metrics.Gauge gauge =
        new SimpleBeamGauge(cc.getAttempted().getValue());
    try {
        String name = metricName(cc.getKey(), addStepName, addNamespace);
        if (registry.getNames().contains(name)) {
            LOGGER.info("Removing metric {}", name);
            registry.remove(name);
        }
        registry.register(name, gauge);
    } catch (IllegalArgumentException e) {
        LOGGER.warn("Duplicated metrics found. Try turning on 
addStepName=true.", e);
    }
}

As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


Reply via email to