It looks like I was finally able to get the expected labeling behavior that
I was looking for by simply storing a reference to the underlying
MetricGroup and then keeping track of any new metrics that I needed to
dynamically create and use downstream:

class MagicMetricRegistry(private val metricGroup: MetricGroup):
Serializable {
    // Reference for all of the registered metrics
    private val registeredMetrics: HashMap<String, Counter> = hashMapOf()

    // Increments a given metric by key
    fun inc(metric: String, tenant: String, source: String, amount: Long =
1) {
        // Store a key
        val key = "$metric-$tenant-$source"
        if (!registeredMetrics.containsKey(key)){
            registeredMetrics[key] = metricGroup
                .addGroup("tenant", tenant)
                .addGroup("source", source)
                .counter(metric)
        }

        // Update the metric by a given amount
        registeredMetrics[key]!!.inc(amount)
    }
}

And then simply within the open function call in my KeyedProcessFunction, I
stored a reference to it and registered any new, in this case tenant/source
combinations, as they came in:

class MagicWindowFunction: KeyedProcessFunction<...>() {
    @Transient private lateinit var metrics: MagicMetricRegistry

    override fun open(parameters: Configuration) {
        metrics = MagicMetricRegistry(runtimeContext.metricGroup)
    }

    override fun processElement(...) {
        // Omitted for brevity

        metrics.inc("logs_seen", "my-tenant", "my-source")
    }

    // Omitted for brevity
}

This appears to be working as expected as far as I can tell at this point.
I can see all of the expected labels appearing within Prometheus and
further downstream in Grafana!

Thanks again,

Rion

On Sun, Feb 28, 2021 at 8:15 AM Rion Williams <rionmons...@gmail.com> wrote:

> Thanks Dylan,
>
> Totally understandable. I already have the appropriate exporters /
> monitors in place for scraping metrics from Flink, including custom ones,
> into Prometheus. The labeling challenge is really the big one as while I
> see lots of labels for the metrics being exported (e.g. job id, worker,
> etc.) I didn’t see a mechanism to inject my own into those coming from
> Flink.
>
> Additionally, in my specific use case I’m dealing with a multi-tenant
> pipeline (I.e. reading messages from a single multi-tenant Kafka topic),
> which is where the labeling comes in. I’d love to be able to have a counter
> (among other types of metrics) with their appropriate labels for each
> tenant.
>
> I suppose I could implement a custom counter or series of counters (one
> for each tenant) that would each be responsible for keeping track of their
> own respective tenant values. In my case I’m dealing with a
> KeyedProcessFunction, so I only have access to the key (tenant) within the
> processElement function as opposed to when the function is initially
> opened, where I understand you would typically register a metric.
>
> Sorry for the somewhat convoluted response, I’m still getting accustomed
> to some of the Flink APIs, specifically around metrics.
>
> Thanks,
>
> Rion
>
> On Feb 28, 2021, at 8:02 AM, Meissner, Dylan <
> dylan.t.meiss...@nordstrom.com> wrote:
>
> 
> Hi Rion,
>
> Regarding the question about adding Prometheus labels out of the box. This
> is common ask of all exporters, but Prometheus philosophy sees this as an
> "anti-pattern" as the metrics source can often be ambivalent about context.
> See [0] for example of such a discussion.
>
> Instead, we can establish context during service discovery. If, for
> example, we run clusters for tenants on Kubernetes, then within the
> kubernetes_sd_config [1] labelling rules we can instruct Prometheus to add
> the Kubernetes labels from the pods, such as "tenant-id: foo" and
> "environment: staging" to each incoming metric it processes.
>
> This isn't limited to Kubernetes; each of the service discovery configs
> designed to accomodate translating metadata from context into metric labels.
>
> If this doesn't work for you, then consider encoding tenant identifier
> into job names, and extract this identifier in a metric_relabel_config [2]
>
> [0]: https://github.com/prometheus/node_exporter/issues/319
> [1]:
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
> [2]:
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs
>
>
> ------------------------------
> *From:* Rion Williams <rionmons...@gmail.com>
> *Sent:* Sunday, February 28, 2021 12:46 AM
> *To:* Prasanna kumar <prasannakumarram...@gmail.com>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: Using Prometheus Client Metrics in Flink
>
> Hi Prassana,
>
> Thanks for that. It’s what I was doing previously as a workaround however
> I was just curious if there was any Flink-specific functionality to handle
> this prior to Prometheus.
>
> Additionally from the docs on metrics [0], it seems that there’s a pattern
> in place to use supported third-party metrics such as those from
> CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do
> see a similarly named package for Prometheus which may be what I’m looking
> for as it’s similarly named (flink-metrics-prometheus), so I may give that
> a try.
>
> Thanks,
>
> Rion
>
> [0]:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
>
> On Feb 28, 2021, at 12:20 AM, Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
> 
> Rion,
>
> Regarding the second question , you can aggregate by using sum function
> sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the
> metric counter.
>
> Prasanna.
>
> On Sat, Feb 27, 2021 at 9:01 PM Rion Williams <rionmons...@gmail.com>
> wrote:
>
> Hi folks,
>
> I’ve just recently started working with Flink and I was in the process of
> adding some metrics through my existing pipeline with the hopes of building
> some Grafana dashboards with them to help with observability.
>
> Initially I looked at the built-in Flink metrics that were available, but
> I didn’t see an easy mechanism for setting/using labels with them.
> Essentially, I have two properties for my messages coming through the
> pipeline that I’d like to be able to keep track of (tenant/source) across
> several metrics (e.g. total_messages with tenant / source labels, etc.). I
> didn’t see an easy way to adjust this out of the box, or wasn’t aware of a
> good pattern for handling these.
>
> I had previously used the Prometheus Client metrics [0] to accomplish this
> in the past but I wasn’t entirely sure how it would/could mesh with Flink.
> Does anyone have experience in working with these or know if they are
> supported?
>
> Secondly, when using the Flink metrics, I noticed I was receiving a
> separate metric for each task that was being spun up. Is there an “easy
> button” to handle aggregating these to ensure that a single metric (e.g.
> total_messages) reflects the total processed across all of the tasks
> instead of each individual one?
>
> Any recommendations / resources / advice would be greatly appreciated!
>
> Thanks,
>
> Rion
>
> [0] : https://prometheus.io/docs/instrumenting/clientlibs/
>
>

Reply via email to