Hi Team,
Does anyone have a working example of a beam job running on top of spark?
So that I can use the beam metric syntax and the metrics will be shipped
out via spark's infra?
The only thing I achieved is to be able to queryMetrics() every half second
and copy all the metrics into the spark metrics.
Wondering if there is a better way?
Thanks!
-Yushu
MetricQueryResults metrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.inNamespace(namespace))
.build());
for (MetricResult<GaugeResult> cc : metrics.getGauges()) {
LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
com.codahale.metrics.Gauge gauge =
new SimpleBeamGauge(cc.getAttempted().getValue());
try {
String name = metricName(cc.getKey(), addStepName, addNamespace);
if (registry.getNames().contains(name)) {
LOGGER.info("Removing metric {}", name);
registry.remove(name);
}
registry.register(name, gauge);
} catch (IllegalArgumentException e) {
LOGGER.warn("Duplicated metrics found. Try turning on
addStepName=true.", e);
}
}