Hi Team,
Does anyone have a working example of a beam job running on top of spark?
So that I can use the beam metric syntax and the metrics will be shipped
out via spark's infra?

The only thing I achieved is to be able to queryMetrics() every half second
and copy all the metrics into the spark metrics.
Wondering if there is a better way?

Thanks!
-Yushu

MetricQueryResults metrics =
    pipelineResult
        .metrics()
        .queryMetrics(
            MetricsFilter.builder()
                .addNameFilter(MetricNameFilter.inNamespace(namespace))
                .build());

for (MetricResult<GaugeResult> cc : metrics.getGauges()) {
    LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
    com.codahale.metrics.Gauge gauge =
        new SimpleBeamGauge(cc.getAttempted().getValue());
    try {
        String name = metricName(cc.getKey(), addStepName, addNamespace);
        if (registry.getNames().contains(name)) {
            LOGGER.info("Removing metric {}", name);
            registry.remove(name);
        }
        registry.register(name, gauge);
    } catch (IllegalArgumentException e) {
        LOGGER.warn("Duplicated metrics found. Try turning on
addStepName=true.", e);
    }
}

Reply via email to