How are you verifying whether it is registered?

For the sake of covering all angles: Are you certain that createPartitionIndex is called?

On 06.07.2017 08:51, wyphao.2007 wrote:
Hi Chesnay, thank you for your reply

The code above does not get registered at all.



在2017年07月06 14时45分, "Chesnay Schepler"<ches...@apache.org>写道:


    Hello,

         Plase provide more information as to how it is not working as
         expected.

         Does it throw an exception, log a warning, is the metric
         not get registered at all or does the value not changing?

         On 06.07.2017 08:10, wyphao.2007 wrote:
    Hi, all
I want to know element's latency before write to Elasticsearch, so I registering a custom metrics as follow:

    class CustomElasticsearchSinkFunction extends
     ElasticsearchSinkFunction[EventEntry] {
      private var metricGroup: Option[MetricGroup] = None
      private var latency: Long = _

      private def init(runtimeContext: RuntimeContext): Unit =          {
        if (metricGroup.isEmpty) {
          metricGroup = Some(runtimeContext.getMetricGroup)
metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() => latency))
        }
      }

      def createIndexRequest(element: EventEntry,  runtimeContext:
    RuntimeContext): IndexRequest = {
        init(runtimeContext)
        latency = System.currentTimeMillis() -
     element.executeTime.getMillis
     Requests.indexRequest.index("test").`type`("event").source(element.json)
      }

      override def process(element: EventEntry,
                           runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit = requestIndexer.add(createIndexRequest(element, runtimeContext))
    }

    but that does not seem to work, Does anyone know why?

    Regards
    wyp



Reply via email to