Yes, you would have to wrap the Spark JmxSink the same way it’s done for the CSV or Graphite one, see [1]. This is necessary to expose Gauges provided by Beam to the sinks.
However, if you are on Spark 3 it’s possible to use the new plugin framework of Spark (see). That’s something I was planning to work on but haven’t found time yet. Using a driver plugin, respective gauges could be registered in Spark’s internal metrics registry (available from PluginContext [3]) without using custom sinks. Btw, all of this is happening on the driver. So actually, there’s no trouble to expect with the executor classpath. / Moritz [1] https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java#L35 [2] https://spark.apache.org/docs/3.1.2/api/java/org/apache/spark/api/plugin/SparkPlugin.html [3] https://spark.apache.org/docs/3.1.2/api/java/org/apache/spark/api/plugin/PluginContext.html#metricRegistry— On 15.07.22, 07:47, "Yushu Yao" <yao.yu...@gmail.com> wrote: Thanks Mortiz! We are using jmx to ship the metrics out of spark. Most of the spark built-in driver and executor metrics are going out fine. Does this require us to make another sink? -Yushu On Thu, Jul 14, 2022 at 1:05 PM Moritz Mack < ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Thanks Mortiz! We are using jmx to ship the metrics out of spark. Most of the spark built-in driver and executor metrics are going out fine. Does this require us to make another sink? -Yushu On Thu, Jul 14, 2022 at 1:05 PM Moritz Mack <mm...@talend.com<mailto:mm...@talend.com>> wrote: Hi Yushu, Wondering, how did you configure your Spark metrics sink? And what version of Spark are you using? Key is to configure Spark to use one of the sinks provided by Beam, e.g.: "spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink" Currently there’s support for CSV and Graphite, but it’s simple to support others. The sinks typically wrap the corresponding Spark sinks and integrate with the Metrics registry of the Spark metric system. https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html<https://urldefense.com/v3/__https:/beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html__;!!CiXD_PY!Q0RNO7g4itIWf5gyr87zTB4bPHQ8BhgVB5n3lWkvl1MUyhI0H63K5OPMEabBlMykhd5cmyMwOBZcark$> When dealing with custom sinks, it’s fairly common to run into classpath issues :/ The metric system is loaded when an executor starts up, by then the application classpath isn’t available yet. Typically, that means distributing such code by other means. /Moritz On 14.07.22, 21:26, "Yushu Yao" <yao.yu...@gmail.com<mailto:yao.yu...@gmail.com>> wrote: Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() every half second and copy all the metrics into the spark metrics. Wondering if there is a better way? Thanks! -Yushu MetricQueryResults metrics = pipelineResult .metrics() .queryMetrics( MetricsFilter.builder() .addNameFilter(MetricNameFilter.inNamespace(namespace)) .build()); for (MetricResult<GaugeResult> cc : metrics.getGauges()) { LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); com.codahale.metrics.Gauge gauge = new SimpleBeamGauge(cc.getAttempted().getValue()); try { String name = metricName(cc.getKey(), addStepName, addNamespace); if (registry.getNames().contains(name)) { LOGGER.info("Removing metric {}", name); registry.remove(name); } registry.register(name, gauge); } catch (IllegalArgumentException e) { LOGGER.warn("Duplicated metrics found. Try turning on addStepName=true.", e); } } As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>