Hi Jerry, thanks for your answer.

I had looked at MetricsSystem, but I couldn't see how I could use it in my
use case, which is:


    stream

        .map { i =>

          Metriker.mr.meter(Metriker.metricName("testmetric123")).mark(i)

          i * 2

        }

>From what I can see, a Source accepts an object and describes how to poll
it for metrics. Presumably that's why Sources have only Gauges and never
Meters, for example. In my case, I don't have a state that I want Spark's
MetricSystem to poll.

If I could get a reference to an internal metricRegistry instance AND an
task identifier in my functions, I could achieve the same thing while using
Spark's metric configuration, but I couldn't find a way to do this either...








ᐧ
On Sun, Jan 4, 2015 at 2:46 AM, Shao, Saisai <saisai.s...@intel.com> wrote:

>  Hi,
>
>
>
> I think there’s a StreamingSource in Spark Streaming which exposes the
> Spark Streaming running status to the metrics sink, you can connect it with
> Graphite sink to expose metrics to Graphite. I’m not sure is this what you
> want.
>
>
>
> Besides you can customize the Source and Sink of the MetricsSystem to
> build your own and configure it in metrics.properties with class name to
> let it loaded by metrics system, for the details you can refer to
> http://spark.apache.org/docs/latest/monitoring.html or source code.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Enno Shioji [mailto:eshi...@gmail.com]
> *Sent:* Sunday, January 4, 2015 7:47 AM
> *To:* user@spark.apache.org
> *Subject:* Better way of measuring custom application metrics
>
>
>
> I have a hack to gather custom application metrics in a Streaming job, but
> I wanted to know if there is any better way of doing this.
>
>
>
> My hack consists of this singleton:
>
>
>
> object Metriker extends Serializable {
>
>   @transient lazy val mr: MetricRegistry = {
>
>     val metricRegistry = new MetricRegistry()
>
>     val graphiteEndpoint = new InetSocketAddress("
> ec2-54-220-56-229.eu-west-1.compute.amazonaws.com", 2003)
>
>     GraphiteReporter
>
>       .forRegistry(metricRegistry)
>
>       .build(new Graphite(graphiteEndpoint))
>
>       .start(5, TimeUnit.SECONDS)
>
>     metricRegistry
>
>   }
>
>
>
>   @transient lazy val processId =
> ManagementFactory.getRuntimeMXBean.getName
>
>
>
>   @transient lazy val hostId = {
>
>     try {
>
>       InetAddress.getLocalHost.getHostName
>
>     } catch {
>
>       case e: UnknownHostException => "localhost"
>
>     }
>
>   }
>
>
>
>    def metricName(name: String): String = {
>
>     "%s.%s.%s".format(name, hostId, processId)
>
>   }
>
> }
>
>
>
>
>
> which I then use in my jobs like so:
>
>
>
>     stream
>
>         .map { i =>
>
>           Metriker.mr.meter(Metriker.metricName("testmetric123")).mark(i)
>
>           i * 2
>
>         }
>
>
>
> Then I aggregate the metrics on Graphite. This works, but I was curious to
> know if anyone has a less hacky way.
>
>
>
>
>
> ᐧ
>

Reply via email to