Hi all,
I am trying to write a custom Source for counting errors and output that
with Spark sink mechanism ( CSV or JMX ) and having some problems
understanding how this works.
1. I defined the Source, added counters created with MetricRegistry and
registered the Source
> SparkEnv.get().metricsSystem().registerSource(this)
2. Used that counter ( I could printout in driver the value )
3. With CsvSink my counter is reported but value is 0. !!
I have following questions:
- I expect that codehale's Counter is serialised and registered but
because objects are different is not the right counter. I have a version
with accumulator and is working fine just little worried about performance.
( and design ) Is there another way of doing this ? maybe static fields ?
- When running on YARN how many sink objects will be created ?
- If I will create some singleton object and register that counter in
Spark, counting is right but will never report from executor. How to enable
reporting from executors when running on YARN ?
My custom Source:
public class CustomMonitoring implements Source {
> private MetricRegistry metricRegistry = new MetricRegistry();
> public CustomMonitoring(List<String> counts) {
> for (String count : counts) {
> metricRegistry.counter(count);
> }
> SparkEnv.get().metricsSystem().registerSource(this);
> }
> @Override
> public String sourceName() {
> return TURBINE_CUSTOM_MONITORING;
> }
> public MetricRegistry metricRegistry() {
> return metricRegistry;
> }
> }
metrics.properties
> *.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
> *.sink.csv.directory=/tmp/csvSink/
> *.sink.csv.period=60
> *.sink.csv.unit=seconds
Thanks you,
Nicolae R.