Hi Chesnay, thank you for your reply

The code above does not get registered at all.






在2017年07月06 14时45分, "Chesnay Schepler"<ches...@apache.org>写道:

             
Hello,
     
     Plase provide more information as to how it is not working as      
expected.
     
     Does it throw an exception, log a warning, is the metric
     not get registered at all or does the value not changing?
     
     On 06.07.2017 08:10, wyphao.2007 wrote:
   
        
       
Hi, all
       
I want to know element's latency before write to          Elasticsearch, so I 
registering a custom metrics as follow:
       

       
       
class CustomElasticsearchSinkFunction extends          
ElasticsearchSinkFunction[EventEntry] {
       
  private var metricGroup: Option[MetricGroup] = None
       
  private var latency: Long = _
       

       
       
  private def init(runtimeContext: RuntimeContext): Unit =          {
       
    if (metricGroup.isEmpty) {
       
      metricGroup = Some(runtimeContext.getMetricGroup)
       
      metricGroup.get.gauge[Long, Gauge[Long]]("esLatency",          
ScalaGauge[Long](() => latency))
       
    }
       
  }
       

       
       
  def createIndexRequest(element: EventEntry,          runtimeContext: 
RuntimeContext): IndexRequest = {
       
    init(runtimeContext)
       
    latency = System.currentTimeMillis() -          
element.executeTime.getMillis
       
             
Requests.indexRequest.index("test").`type`("event").source(element.json)
       
  }
       

       
       
  override def process(element: EventEntry,
       
                       runtimeContext: RuntimeContext,
       
                       requestIndexer: RequestIndexer):          Unit =
       
    requestIndexer.add(createIndexRequest(element,          runtimeContext))
       
}
       

       
       
but that does not seem to work, Does anyone know why?
       

       
       
Regards
       
wyp
       

       
     
            


   

Reply via email to