Yes, that’s exactly what I was referring to. A - hopefully - easy way to avoid this problem might be to change the Spark configuration to use the following: --conf "spark.metrics.conf.driver.sink.jmx.class"="com.salesforce.einstein.data.platform.connectors.JmxSink" --conf "spark.metrics.conf.executor.sink.jmx.class"="org.apache.spark.metrics.sink.JmxSink"
Beam metrics are only exposed on the driver, so it’s enough to use your custom Sink on the driver. Those classpath issues are limited to executor nodes if I remember right (and I’d be surprised to see the same on the driver). Though, I haven’t tested … By distributing such code by other means, I meant adding the relevant classes to the system classpath of the executor ($SPARK_HOME/jars). How to do that depends heavily on your infrastructure. For example, on Kubernetes it could be a base image that already contains the custom sink; on EMR it could be a bootstrap action that copies the sink from S3 … Of course, that’s not great because it affects everything running on the cluster with all the potential problems this creates. / Moritz On 16.07.22, 07:54, "Yushu Yao" <yao.yu...@gmail.com> wrote: Hi Moritz, When dealing with custom sinks, it’s fairly common to run into classpath issues :/ The metric system is loaded when an executor starts up, by then the application classpath isn’t available yet. Typically, that means distributing ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Moritz, When dealing with custom sinks, it’s fairly common to run into classpath issues :/ The metric system is loaded when an executor starts up, by then the application classpath isn’t available yet. Typically, that means distributing such code by other means. Could you also elaborate a bit about "distributing such code by other means"? I wrapped JmxSink (just like CsvSink for beam). And now gets the following error. ERROR MetricsSystem: Sink class com.salesforce.einstein.data.platform.connectors.JmxSink cannot be instantiated Caused by: java.lang.ClassNotFoundException: com.salesforce.einstein.data.platform.connectors.JmxSink I guess this is the classpath issue you were referring to above. Any hints on how to fix it will be greatly appreciated. Thanks! -Yushu /Moritz On 14.07.22, 21:26, "Yushu Yao" <yao.yu...@gmail.com<mailto:yao.yu...@gmail.com>> wrote: Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() every half second and copy all the metrics into the spark metrics. Wondering if there is a better way? Thanks! -Yushu MetricQueryResults metrics = pipelineResult .metrics() .queryMetrics( MetricsFilter.builder() .addNameFilter(MetricNameFilter.inNamespace(namespace)) .build()); for (MetricResult<GaugeResult> cc : metrics.getGauges()) { LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); com.codahale.metrics.Gauge gauge = new SimpleBeamGauge(cc.getAttempted().getValue()); try { String name = metricName(cc.getKey(), addStepName, addNamespace); if (registry.getNames().contains(name)) { LOGGER.info("Removing metric {}", name); registry.remove(name); } registry.register(name, gauge); } catch (IllegalArgumentException e) { LOGGER.warn("Duplicated metrics found. Try turning on addStepName=true.", e); } } As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>