Hi Jerry, thanks for your answer. I had looked at MetricsSystem, but I couldn't see how I could use it in my use case, which is:
stream .map { i => Metriker.mr.meter(Metriker.metricName("testmetric123")).mark(i) i * 2 } >From what I can see, a Source accepts an object and describes how to poll it for metrics. Presumably that's why Sources have only Gauges and never Meters, for example. In my case, I don't have a state that I want Spark's MetricSystem to poll. If I could get a reference to an internal metricRegistry instance AND an task identifier in my functions, I could achieve the same thing while using Spark's metric configuration, but I couldn't find a way to do this either... ᐧ On Sun, Jan 4, 2015 at 2:46 AM, Shao, Saisai <saisai.s...@intel.com> wrote: > Hi, > > > > I think there’s a StreamingSource in Spark Streaming which exposes the > Spark Streaming running status to the metrics sink, you can connect it with > Graphite sink to expose metrics to Graphite. I’m not sure is this what you > want. > > > > Besides you can customize the Source and Sink of the MetricsSystem to > build your own and configure it in metrics.properties with class name to > let it loaded by metrics system, for the details you can refer to > http://spark.apache.org/docs/latest/monitoring.html or source code. > > > > Thanks > > Jerry > > > > *From:* Enno Shioji [mailto:eshi...@gmail.com] > *Sent:* Sunday, January 4, 2015 7:47 AM > *To:* user@spark.apache.org > *Subject:* Better way of measuring custom application metrics > > > > I have a hack to gather custom application metrics in a Streaming job, but > I wanted to know if there is any better way of doing this. > > > > My hack consists of this singleton: > > > > object Metriker extends Serializable { > > @transient lazy val mr: MetricRegistry = { > > val metricRegistry = new MetricRegistry() > > val graphiteEndpoint = new InetSocketAddress(" > ec2-54-220-56-229.eu-west-1.compute.amazonaws.com", 2003) > > GraphiteReporter > > .forRegistry(metricRegistry) > > .build(new Graphite(graphiteEndpoint)) > > .start(5, TimeUnit.SECONDS) > > metricRegistry > > } > > > > @transient lazy val processId = > ManagementFactory.getRuntimeMXBean.getName > > > > @transient lazy val hostId = { > > try { > > InetAddress.getLocalHost.getHostName > > } catch { > > case e: UnknownHostException => "localhost" > > } > > } > > > > def metricName(name: String): String = { > > "%s.%s.%s".format(name, hostId, processId) > > } > > } > > > > > > which I then use in my jobs like so: > > > > stream > > .map { i => > > Metriker.mr.meter(Metriker.metricName("testmetric123")).mark(i) > > i * 2 > > } > > > > Then I aggregate the metrics on Graphite. This works, but I was curious to > know if anyone has a less hacky way. > > > > > > ᐧ >