Yes, that’s exactly what I was referring to.

A - hopefully - easy way to avoid this problem might be to change the Spark 
configuration to use the following:
--conf 
"spark.metrics.conf.driver.sink.jmx.class"="com.salesforce.einstein.data.platform.connectors.JmxSink"
--conf 
"spark.metrics.conf.executor.sink.jmx.class"="org.apache.spark.metrics.sink.JmxSink"

Beam metrics are only exposed on the driver, so it’s enough to use your custom 
Sink on the driver. Those classpath issues are limited to executor nodes if I 
remember right (and I’d be surprised to see the same on the driver). Though, I 
haven’t tested …

By distributing such code by other means, I meant adding the relevant classes 
to the system classpath of the executor ($SPARK_HOME/jars). How to do that 
depends heavily on your infrastructure. For example, on Kubernetes it could be 
a base image that already contains the custom sink; on EMR it could be a 
bootstrap action that copies the sink from S3 … Of course, that’s not great 
because it affects everything running on the cluster with all the potential 
problems this creates.

/ Moritz



On 16.07.22, 07:54, "Yushu Yao" <yao.yu...@gmail.com> wrote:

Hi Moritz, When dealing with custom sinks, it’s fairly common to run into 
classpath issues :⁠​/ The metric system is loaded when an executor starts up, 
by then the application classpath isn’t available yet.⁠​ Typically, that means 
distributing
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Moritz,


When dealing with custom sinks, it’s fairly common to run into classpath issues 
:/ The metric system is loaded when an executor starts up, by then the 
application classpath isn’t available yet. Typically, that means distributing 
such code by other means.


Could you also elaborate a bit about "distributing such code by other means"?

I wrapped JmxSink (just like CsvSink for beam). And now gets the following 
error.

ERROR MetricsSystem: Sink class 
com.salesforce.einstein.data.platform.connectors.JmxSink cannot be instantiated
 Caused by: java.lang.ClassNotFoundException: 
com.salesforce.einstein.data.platform.connectors.JmxSink

I guess this is the classpath issue you were referring to above. Any hints on 
how to fix it will be greatly appreciated.
Thanks!
-Yushu

/Moritz


On 14.07.22, 21:26, "Yushu Yao" 
<yao.yu...@gmail.com<mailto:yao.yu...@gmail.com>> wrote:

Hi Team,  Does anyone have a working example of a beam job running on top of 
spark? So that I can use the beam metric syntax and the metrics will be shipped 
out via spark's infra?  The only thing I achieved is to be able to 
queryMetrics()
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Team,
Does anyone have a working example of a beam job running on top of spark? So 
that I can use the beam metric syntax and the metrics will be shipped out via 
spark's infra?

The only thing I achieved is to be able to queryMetrics() every half second and 
copy all the metrics into the spark metrics.
Wondering if there is a better way?

Thanks!
-Yushu


MetricQueryResults metrics =
    pipelineResult
        .metrics()
        .queryMetrics(
            MetricsFilter.builder()
                .addNameFilter(MetricNameFilter.inNamespace(namespace))
                .build());

for (MetricResult<GaugeResult> cc : metrics.getGauges()) {
    LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
    com.codahale.metrics.Gauge gauge =
        new SimpleBeamGauge(cc.getAttempted().getValue());
    try {
        String name = metricName(cc.getKey(), addStepName, addNamespace);
        if (registry.getNames().contains(name)) {
            LOGGER.info("Removing metric {}", name);
            registry.remove(name);
        }
        registry.register(name, gauge);
    } catch (IllegalArgumentException e) {
        LOGGER.warn("Duplicated metrics found. Try turning on 
addStepName=true.", e);
    }
}

As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


Reply via email to