Hello,
Plase provide more information as to how it is not working as expected.
Does it throw an exception, log a warning, is the metric
not get registered at all or does the value not changing?
On 06.07.2017 08:10, wyphao.2007 wrote:
Hi, all
I want to know element's latency before write to Elasticsearch, so I
registering a custom metrics as follow:
class CustomElasticsearchSinkFunction extends
ElasticsearchSinkFunction[EventEntry] {
private var metricGroup: Option[MetricGroup] = None
private var latency: Long = _
private def init(runtimeContext: RuntimeContext): Unit = {
if (metricGroup.isEmpty) {
metricGroup = Some(runtimeContext.getMetricGroup)
metricGroup.get.gauge[Long, Gauge[Long]]("esLatency",
ScalaGauge[Long](() => latency))
}
}
def createIndexRequest(element: EventEntry, runtimeContext:
RuntimeContext): IndexRequest = {
init(runtimeContext)
latency = System.currentTimeMillis() - element.executeTime.getMillis
Requests.indexRequest.index("test").`type`("event").source(element.json)
}
override def process(element: EventEntry,
runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit =
requestIndexer.add(createIndexRequest(element, runtimeContext))
}
but that does not seem to work, Does anyone know why?
Regards
wyp