How are you verifying whether it is registered?
For the sake of covering all angles: Are you certain that
createPartitionIndex is called?
On 06.07.2017 08:51, wyphao.2007 wrote:
Hi Chesnay, thank you for your reply
The code above does not get registered at all.
在2017年07月06 14时45分, "Chesnay Schepler"<ches...@apache.org>写道:
Hello,
Plase provide more information as to how it is not working as
expected.
Does it throw an exception, log a warning, is the metric
not get registered at all or does the value not changing?
On 06.07.2017 08:10, wyphao.2007 wrote:
Hi, all
I want to know element's latency before write to
Elasticsearch, so I registering a custom metrics as follow:
class CustomElasticsearchSinkFunction extends
ElasticsearchSinkFunction[EventEntry] {
private var metricGroup: Option[MetricGroup] = None
private var latency: Long = _
private def init(runtimeContext: RuntimeContext): Unit = {
if (metricGroup.isEmpty) {
metricGroup = Some(runtimeContext.getMetricGroup)
metricGroup.get.gauge[Long, Gauge[Long]]("esLatency",
ScalaGauge[Long](() => latency))
}
}
def createIndexRequest(element: EventEntry, runtimeContext:
RuntimeContext): IndexRequest = {
init(runtimeContext)
latency = System.currentTimeMillis() -
element.executeTime.getMillis
Requests.indexRequest.index("test").`type`("event").source(element.json)
}
override def process(element: EventEntry,
runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer):
Unit =
requestIndexer.add(createIndexRequest(element,
runtimeContext))
}
but that does not seem to work, Does anyone know why?
Regards
wyp