Hello,

Plase provide more information as to how it is not working as expected.

Does it throw an exception, log a warning, is the metric
not get registered at all or does the value not changing?

On 06.07.2017 08:10, wyphao.2007 wrote:
Hi, all
I want to know element's latency before write to Elasticsearch, so I registering a custom metrics as follow:

class CustomElasticsearchSinkFunction extends ElasticsearchSinkFunction[EventEntry] {
  private var metricGroup: Option[MetricGroup] = None
  private var latency: Long = _

  private def init(runtimeContext: RuntimeContext): Unit = {
    if (metricGroup.isEmpty) {
      metricGroup = Some(runtimeContext.getMetricGroup)
metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() => latency))
    }
  }

def createIndexRequest(element: EventEntry, runtimeContext: RuntimeContext): IndexRequest = {
    init(runtimeContext)
    latency = System.currentTimeMillis() - element.executeTime.getMillis
Requests.indexRequest.index("test").`type`("event").source(element.json)
  }

  override def process(element: EventEntry,
                       runtimeContext: RuntimeContext,
                       requestIndexer: RequestIndexer): Unit =
    requestIndexer.add(createIndexRequest(element, runtimeContext))
}

but that does not seem to work, Does anyone know why?

Regards
wyp


Reply via email to