Ok, I see your problem. And yes, keeping a map of metrics should work. Just for double checking, I assume there's an upper bound of your map keys (table names)? Because if not, an infinitely increasing in-memory map that is not managed by Flink's state might become problematic.
Thank you~ Xintong Song On Fri, Jul 3, 2020 at 2:39 PM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > Seems there's no direct solution. > Perhaps i can implement this by initializing a HashMap<String, Meter> with > all the possible value of tableName in `open` mehtod and get the > corresponding Meter according to tableName in the `invoke` method. > > > Thanks, > Lei > ------------------------------ > wangl...@geekplus.com.cn > > > *Sender:* wangl...@geekplus.com.cn > *Send Time:* 2020-07-03 14:27 > *Receiver:* Xintong Song <tonysong...@gmail.com> > *cc:* user <user@flink.apache.org> > *Subject:* Re: Re: How to dynamically initialize flink metrics in invoke > method and then reuse it? > Hi Xintong, > > Yes, initializing the metric in the `open` method works, but it doesn't > solve my problem. > I want to initialize the metric with a name that is extracted from the > record content. Only in the `invoke` method i can do it. > > Actually my scenario is as follows. > The record is MySQL binlog info. I want to monitor the qps by tableName. > The tableName is different for every record. > > Thanks, > Lei > > > ------------------------------ > wangl...@geekplus.com.cn > > *Sender:* Xintong Song <tonysong...@gmail.com> > *Send Time:* 2020-07-03 13:14 > *Receiver:* wangl...@geekplus.com.cn > *cc:* user <user@flink.apache.org> > *Subject:* Re: How to dynamically initialize flink metrics in invoke > method and then reuse it? > Hi Lei, > > I think you should initialize the metric in the `open` method. Then you > can save the initialized metric as a class field, and update it in the > `invoke` method for each record. > > Thank you~ > > Xintong Song > > > > On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn> wrote: > >> >> In one flink operator, i want to initialize multiple flink metrics >> according to message content. >> As the code below. >> >> public void invoke(ObjectNode node, Context context) throws Exception { >> >> String tableName = node.get("metadata").get("topic").asText(); >> Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, >> new MeterView(10)); >> meter.markEvent(); >> log.info("### counter: " + meter.toString() + "\t" + >> meter.getCount()); >> >> >> But in this way every invoke call will initialize a new metrics and the >> count will be from zero again. >> How can i reuse the metric initialized before? >> >> Thanks, >> Lei >> ------------------------------ >> wangl...@geekplus.com.cn >> >>