Hi Yushu, Wondering, how did you configure your Spark metrics sink? And what version of Spark are you using?
Key is to configure Spark to use one of the sinks provided by Beam, e.g.: "spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink" Currently there’s support for CSV and Graphite, but it’s simple to support others. The sinks typically wrap the corresponding Spark sinks and integrate with the Metrics registry of the Spark metric system. https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html When dealing with custom sinks, it’s fairly common to run into classpath issues :/ The metric system is loaded when an executor starts up, by then the application classpath isn’t available yet. Typically, that means distributing such code by other means. /Moritz On 14.07.22, 21:26, "Yushu Yao" <yao.yu...@gmail.com> wrote: Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() every half second and copy all the metrics into the spark metrics. Wondering if there is a better way? Thanks! -Yushu MetricQueryResults metrics = pipelineResult .metrics() .queryMetrics( MetricsFilter.builder() .addNameFilter(MetricNameFilter.inNamespace(namespace)) .build()); for (MetricResult<GaugeResult> cc : metrics.getGauges()) { LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); com.codahale.metrics.Gauge gauge = new SimpleBeamGauge(cc.getAttempted().getValue()); try { String name = metricName(cc.getKey(), addStepName, addNamespace); if (registry.getNames().contains(name)) { LOGGER.info("Removing metric {}", name); registry.remove(name); } registry.register(name, gauge); } catch (IllegalArgumentException e) { LOGGER.warn("Duplicated metrics found. Try turning on addStepName=true.", e); } } As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>