It looks like I was finally able to get the expected labeling behavior that I was looking for by simply storing a reference to the underlying MetricGroup and then keeping track of any new metrics that I needed to dynamically create and use downstream:
class MagicMetricRegistry(private val metricGroup: MetricGroup): Serializable { // Reference for all of the registered metrics private val registeredMetrics: HashMap<String, Counter> = hashMapOf() // Increments a given metric by key fun inc(metric: String, tenant: String, source: String, amount: Long = 1) { // Store a key val key = "$metric-$tenant-$source" if (!registeredMetrics.containsKey(key)){ registeredMetrics[key] = metricGroup .addGroup("tenant", tenant) .addGroup("source", source) .counter(metric) } // Update the metric by a given amount registeredMetrics[key]!!.inc(amount) } } And then simply within the open function call in my KeyedProcessFunction, I stored a reference to it and registered any new, in this case tenant/source combinations, as they came in: class MagicWindowFunction: KeyedProcessFunction<...>() { @Transient private lateinit var metrics: MagicMetricRegistry override fun open(parameters: Configuration) { metrics = MagicMetricRegistry(runtimeContext.metricGroup) } override fun processElement(...) { // Omitted for brevity metrics.inc("logs_seen", "my-tenant", "my-source") } // Omitted for brevity } This appears to be working as expected as far as I can tell at this point. I can see all of the expected labels appearing within Prometheus and further downstream in Grafana! Thanks again, Rion On Sun, Feb 28, 2021 at 8:15 AM Rion Williams <rionmons...@gmail.com> wrote: > Thanks Dylan, > > Totally understandable. I already have the appropriate exporters / > monitors in place for scraping metrics from Flink, including custom ones, > into Prometheus. The labeling challenge is really the big one as while I > see lots of labels for the metrics being exported (e.g. job id, worker, > etc.) I didn’t see a mechanism to inject my own into those coming from > Flink. > > Additionally, in my specific use case I’m dealing with a multi-tenant > pipeline (I.e. reading messages from a single multi-tenant Kafka topic), > which is where the labeling comes in. I’d love to be able to have a counter > (among other types of metrics) with their appropriate labels for each > tenant. > > I suppose I could implement a custom counter or series of counters (one > for each tenant) that would each be responsible for keeping track of their > own respective tenant values. In my case I’m dealing with a > KeyedProcessFunction, so I only have access to the key (tenant) within the > processElement function as opposed to when the function is initially > opened, where I understand you would typically register a metric. > > Sorry for the somewhat convoluted response, I’m still getting accustomed > to some of the Flink APIs, specifically around metrics. > > Thanks, > > Rion > > On Feb 28, 2021, at 8:02 AM, Meissner, Dylan < > dylan.t.meiss...@nordstrom.com> wrote: > > > Hi Rion, > > Regarding the question about adding Prometheus labels out of the box. This > is common ask of all exporters, but Prometheus philosophy sees this as an > "anti-pattern" as the metrics source can often be ambivalent about context. > See [0] for example of such a discussion. > > Instead, we can establish context during service discovery. If, for > example, we run clusters for tenants on Kubernetes, then within the > kubernetes_sd_config [1] labelling rules we can instruct Prometheus to add > the Kubernetes labels from the pods, such as "tenant-id: foo" and > "environment: staging" to each incoming metric it processes. > > This isn't limited to Kubernetes; each of the service discovery configs > designed to accomodate translating metadata from context into metric labels. > > If this doesn't work for you, then consider encoding tenant identifier > into job names, and extract this identifier in a metric_relabel_config [2] > > [0]: https://github.com/prometheus/node_exporter/issues/319 > [1]: > https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config > [2]: > https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs > > > ------------------------------ > *From:* Rion Williams <rionmons...@gmail.com> > *Sent:* Sunday, February 28, 2021 12:46 AM > *To:* Prasanna kumar <prasannakumarram...@gmail.com> > *Cc:* user <user@flink.apache.org> > *Subject:* Re: Using Prometheus Client Metrics in Flink > > Hi Prassana, > > Thanks for that. It’s what I was doing previously as a workaround however > I was just curious if there was any Flink-specific functionality to handle > this prior to Prometheus. > > Additionally from the docs on metrics [0], it seems that there’s a pattern > in place to use supported third-party metrics such as those from > CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do > see a similarly named package for Prometheus which may be what I’m looking > for as it’s similarly named (flink-metrics-prometheus), so I may give that > a try. > > Thanks, > > Rion > > [0]: > https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html > > On Feb 28, 2021, at 12:20 AM, Prasanna kumar < > prasannakumarram...@gmail.com> wrote: > > > Rion, > > Regarding the second question , you can aggregate by using sum function > sum(metric_name{jobb_name="JOBNAME"}) . This works is you are using the > metric counter. > > Prasanna. > > On Sat, Feb 27, 2021 at 9:01 PM Rion Williams <rionmons...@gmail.com> > wrote: > > Hi folks, > > I’ve just recently started working with Flink and I was in the process of > adding some metrics through my existing pipeline with the hopes of building > some Grafana dashboards with them to help with observability. > > Initially I looked at the built-in Flink metrics that were available, but > I didn’t see an easy mechanism for setting/using labels with them. > Essentially, I have two properties for my messages coming through the > pipeline that I’d like to be able to keep track of (tenant/source) across > several metrics (e.g. total_messages with tenant / source labels, etc.). I > didn’t see an easy way to adjust this out of the box, or wasn’t aware of a > good pattern for handling these. > > I had previously used the Prometheus Client metrics [0] to accomplish this > in the past but I wasn’t entirely sure how it would/could mesh with Flink. > Does anyone have experience in working with these or know if they are > supported? > > Secondly, when using the Flink metrics, I noticed I was receiving a > separate metric for each task that was being spun up. Is there an “easy > button” to handle aggregating these to ensure that a single metric (e.g. > total_messages) reflects the total processed across all of the tasks > instead of each individual one? > > Any recommendations / resources / advice would be greatly appreciated! > > Thanks, > > Rion > > [0] : https://prometheus.io/docs/instrumenting/clientlibs/ > >