Hi Moritz,
> > When dealing with custom sinks, it’s fairly common to run into classpath > issues :/ The metric system is loaded when an executor starts up, by then > the application classpath isn’t available yet. Typically, that means > distributing such code by other means. > > > Could you also elaborate a bit about "distributing such code by other means"? I wrapped JmxSink (just like CsvSink for beam). And now gets the following error. *ERROR MetricsSystem: Sink class com.salesforce.einstein.data.platform.connectors.JmxSink cannot be instantiated* * Caused by: java.lang.ClassNotFoundException: com.salesforce.einstein.data.platform.connectors.JmxSink* I guess this is the classpath issue you were referring to above. Any hints on how to fix it will be greatly appreciated. Thanks! -Yushu /Moritz > > > > > > On 14.07.22, 21:26, "Yushu Yao" <yao.yu...@gmail.com> wrote: > > > > Hi Team, Does anyone have a working example of a beam job running on top > of spark? So that I can use the beam metric syntax and the metrics will be > shipped out via spark's infra? The only thing I achieved is to be able to > queryMetrics() > > ZjQcmQRYFpfptBannerStart > > *This Message Is From an External Sender * > > This message came from outside your organization. > > Exercise caution when opening attachments or clicking any links. > > ZjQcmQRYFpfptBannerEnd > > Hi Team, > > Does anyone have a working example of a beam job running on top of spark? > So that I can use the beam metric syntax and the metrics will be shipped > out via spark's infra? > > > > The only thing I achieved is to be able to queryMetrics() every half > second and copy all the metrics into the spark metrics. > > Wondering if there is a better way? > > > > Thanks! > > -Yushu > > > > MetricQueryResults metrics = > pipelineResult > .metrics() > .queryMetrics( > MetricsFilter.*builder*() > .addNameFilter(MetricNameFilter.*inNamespace*(namespace)) > .build()); > > for (MetricResult<GaugeResult> cc : metrics.getGauges()) { > *LOGGER*.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); > com.codahale.metrics.Gauge gauge = > new SimpleBeamGauge(cc.getAttempted().getValue()); > try { > String name = metricName(cc.getKey(), addStepName, addNamespace); > if (registry.getNames().contains(name)) { > *LOGGER*.info("Removing metric {}", name); > registry.remove(name); > } > registry.register(name, gauge); > } catch (IllegalArgumentException e) { > *LOGGER*.warn("Duplicated metrics found. Try turning on > addStepName=true.", e); > } > } > > *As a recipient of an email from Talend, your contact personal data will > be on our systems. Please see our privacy notice. > <https://www.talend.com/privacy/>* > > >