Hi Chesnay,

Thanks for the prompt response and feedback, it's very much appreciated.
Please see the inline responses below to your questions:

*Was there anything in the logs (ideally on debug)?*


I didn't see anything within the logs that seemed to indicate anything out
of the ordinary. I'm currently using a MiniClusterResources for this and
attempted to set the logging levels to pick up everything (i.e. ALL), but
if there's a way to expose more, I'm not aware of it.

*Have you debugged the execution and followed the counter() calls all the
> way to the reporter?*


With the debugger, I traced one of the counter initializations and it seems
that no reporters were being found within the register call in the
MetricsRegistryImpl (i.e. this.reporters has no registered reporters):

if (this.reporters != null) {
    for(int i = 0; i < this.reporters.size(); ++i) {
        MetricRegistryImpl.ReporterAndSettings reporterAndSettings =
(MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);

        try {
            if (reporterAndSettings != null) {
                FrontMetricGroup front = new
FrontMetricGroup(reporterAndSettings.getSettings(), group);

reporterAndSettings.getReporter().notifyOfAddedMetric(metric,
metricName, front);
            }
        } catch (Exception var11) {
            LOG.warn("Error while registering metric: {}.", metricName, var11);
        }
    }
}

 Perhaps this is an error on my part as I had assumed the following would
be sufficient to register my reporter (within a local / minicluster
environment):

private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
    ConfigConstants.METRICS_REPORTER_PREFIX +
    "MockCustomMetricsReporter." +
    ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to
MockCustomMetricsReporter::class.java.name
))

@ClassRule
@JvmField
val flink = MiniClusterResource(
    MiniClusterResourceConfiguration.Builder()
        .setConfiguration(metricsConfiguration)
        .setNumberTaskManagers(1)
        .setNumberSlotsPerTaskManager(1)
        .build()
)

However, it's clearly being recognized for the built-in metrics, just not
these custom ones that are being registered as they are triggering the
notifyOfAddedMetric() function within the reporter itself.

*Do you only see JobManager metrics, or is there somewhere also something
> about the TaskManager?*


It looks like there are metrics coming from both the JobManager and
TaskManagers from the following examples that were coming out:

localhost.jobmanager.numRegisteredTaskManagers
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
localhost.jobmanager.Status.JVM.Memory.Direct.Count

I do agree that a factory implementation with a static reporter would
likely be a better approach, so I may explore that a bit more. As well as
adding some changes to the existing, albeit ghetto, implementation for
handling the dynamic metrics. I did see several references to a
MetricRegistry class, however I wasn't sure if that was the most
appropriate place to add this type of functionality or if it was needed at
all.

Thanks much,

Rion



On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler <[email protected]> wrote:

> Was there anything in the logs (ideally on debug)?
> Have you debugged the execution and followed the counter() calls all the
> way to the reporter?
> Do you only see JobManager metrics, or is there somewhere also something
> about the TaskManager?
>
> I can see several issues with your code, but none that would fully explain
> the issue:
>
> a) your reporter is not thread-safe
> b) you only differentiate metrics by name, which will lead to quite a few
> collisions.
>
> Be also aware that there will be 2 reporter instances; one for the JM and
> one for the TM.
> To remedy this, I would recommend creating a factory that returns a static
> reporter instance instead; overall this tends to be cleaner.
>
> Alternatively, when using the testing harnesses IIRC you can also set set
> a custom MetricGroup implementation.
>
> On 3/16/2021 4:13 AM, Rion Williams wrote:
>
> Hi all,
>
> Recently, I was working on adding some custom metrics to a Flink job that
> required the use of dynamic labels (i.e. capturing various counters that
> were "slicable" by things like tenant / source, etc.).
>
> I ended up handling it in a very naive fashion that would just keep a
> dictionary of metrics that had already been registered and update them
> accordingly which looked something like this:
>
> class MyCustomProcessFunction: ProcessFunction<Event, Unit>() {
>     private lateinit var metrics: CustomMetricsRegistry    override fun 
> open(parameters: Configuration) {
>         metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
>     }
>
>     override fun processElement(event: Event, context: Context, collector: 
> Collector<Unit>) {
>         // Insert calls like metrics.inc("tenant-name", 4) here    }
> }
> class CustomMetricsRegistry(private val metricGroup: MetricGroup): 
> Serializable {
>     // Increments a given metric by key    fun inc(metric: String, tenant: 
> String, amount: Long = 1) {
>         // Store a key for the metric        val key = "$metric-$tenant"      
>   // Store/register the metric        if 
> (!registeredMetrics.containsKey(key)){
>             registeredMetrics[key] = metricGroup                
> .addGroup("tenant", tenant)
>                 .counter(metric)
>         }
>
>         // Update the metric by a given amount        
> registeredMetrics[key]!!.inc(amount)
>     }
>
>     companion object {
>         private var registeredMetrics: HashMap<String, Counter> = hashMapOf()
>     }
> }
>
> Basically registering and updating new metrics for tenants as they are
> encountered, which I've seen being emitted as expected via hitting the
> appropriately configured metrics endpoint (using a PrometheusReporter).
>
> However, while I was trying to write a few unit tests for this, I seemed
> to encounter an issue. I was following a Stack Overflow post that was
> answered by @Chesnay Schepler <[email protected]> [0] that described the
> use of an in-memory/embedded Flink cluster and a custom reporter that would
> statically expose the underlying metrics.
>
> So I took a shot at implementing something similar as follows:
>
> *Flink Cluster Definition*
>
> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
>     ConfigConstants.METRICS_REPORTER_PREFIX +
>     "MockCustomMetricsReporter." +
>     ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
> MockCustomMetricsReporter::class.java.name))
> @ClassRule@JvmFieldval flinkCluster = MiniClusterResource(
>     MiniClusterResourceConfiguration.Builder()
>         .setConfiguration(metricsConfiguration)
>         .setNumberTaskManagers(1)
>         .setNumberSlotsPerTaskManager(1)
>         .build()
> )
>
> *Custom Reporter*
>
> class MockCustomMetricsReporter : MetricReporter {
>
>     override fun open(metricConfig: MetricConfig) {}
>     override fun close() {}
>     override fun notifyOfAddedMetric(metric: Metric, name: String, 
> metricGroup: MetricGroup) {
>         // Store the metrics that are being registered as we see them        
> if (!registeredCustomMetrics.containsKey(name)){
>             registeredCustomMetrics[name] = metric        }
>     }
>
>     override fun notifyOfRemovedMetric(metric: Metric, name: String, 
> metricGroup: MetricGroup) {
>         // Do nothing here    }
>
>     companion object {
>         // Static reference to metrics as they are registered        var 
> registeredCustomMetrics = HashMap<String, Metric>()
>     }
> }
>
> *Example Test*
>
> @Testfun `Example Metrics Use Case`(){
>     // Arrange    val stream = 
> StreamExecutionEnvironment.getExecutionEnvironment()
>     val events = listOf(
>         eventWithUsers("tenant1", "[email protected]"),
>         eventWithUsers("tenant2", "[email protected]"),
>     )
>
>     // Act    stream
>         .fromCollection(events)
>         .process(MyCustomProcessFunction())
>
>     // Assert    stream.execute()
>     assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
> }
>
> While this test will pass, *the problem is that the custom metrics
> defined dynamically (via the CustomMetricsRegistry implementation) do not
> appear within the registeredCustomMetrics collection*. In fact, there are
> 21 metrics that get registered but all of them appear to be classic
> out-of-the-box metrics such as CPU usage, number of task managers, load,
> various other Netty and JVM stats, but no custom metrics are included.
>
> I've tried multiple different configurations, implementations via a custom
> TestHarness, etc. but for some reason the custom metrics being defined are
> never triggering the notifyOfAddedMetric function which would be
> responsible for adding them to the static collection to be asserted
> against.
>
> Any ideas / guidance would be more than welcome. Perhaps a different
> approach? Based off examples I've encountered, the code seems like it
> should "just work".
>
> Thanks much,
>
> Rion
>
> [0] :
> https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
>
>
>
>

Reply via email to