In this case, I was using a harness to test the function. Although, I could
honestly care less about the unit-test surrounding metrics, I'm much more
concerned with having something that will actually run and work as intended
within a job. The only real concern I have or problem that I want to solve
is building metrics that may vary based on the data coming in from a
"label" perspective (e.g. keeping track of the events I've seen for a given
tenant, or some other properties).
Something like:
<metric prefix>_events_seen { tenant = "tenant-1" } 1.0
<metric prefix>_events_seen { tenant = "tenant-2" } 200.0
If that makes sense. I've used the Prometheus client previously to
accomplish these types of metrics, but since I'm fairly new to the Flink
world, I was trying to use the built-in constructs available (thus the
dynamic groups / metrics being added).
On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler <[email protected]> wrote:
> Are you actually running a job, or are you using a harness for testing
> your function?
>
> On 3/16/2021 3:24 PM, Rion Williams wrote:
>
> Hi Chesnay,
>
> Thanks for the prompt response and feedback, it's very much appreciated.
> Please see the inline responses below to your questions:
>
> *Was there anything in the logs (ideally on debug)?*
>
>
> I didn't see anything within the logs that seemed to indicate anything out
> of the ordinary. I'm currently using a MiniClusterResources for this and
> attempted to set the logging levels to pick up everything (i.e. ALL), but
> if there's a way to expose more, I'm not aware of it.
>
> *Have you debugged the execution and followed the counter() calls all the
>> way to the reporter?*
>
>
> With the debugger, I traced one of the counter initializations and it
> seems that no reporters were being found within the register call in the
> MetricsRegistryImpl (i.e. this.reporters has no registered reporters):
>
> if (this.reporters != null) {
> for(int i = 0; i < this.reporters.size(); ++i) {
> MetricRegistryImpl.ReporterAndSettings reporterAndSettings =
> (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);
>
> try {
> if (reporterAndSettings != null) {
> FrontMetricGroup front = new
> FrontMetricGroup(reporterAndSettings.getSettings(), group);
> reporterAndSettings.getReporter().notifyOfAddedMetric(metric,
> metricName, front);
> }
> } catch (Exception var11) {
> LOG.warn("Error while registering metric: {}.", metricName,
> var11);
> }
> }
> }
>
> Perhaps this is an error on my part as I had assumed the following would
> be sufficient to register my reporter (within a local / minicluster
> environment):
>
> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
> ConfigConstants.METRICS_REPORTER_PREFIX +
> "MockCustomMetricsReporter." +
> ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to
> MockCustomMetricsReporter::class.java.name))
> @ClassRule@JvmFieldval flink = MiniClusterResource(
> MiniClusterResourceConfiguration.Builder()
> .setConfiguration(metricsConfiguration)
> .setNumberTaskManagers(1)
> .setNumberSlotsPerTaskManager(1)
> .build()
> )
>
> However, it's clearly being recognized for the built-in metrics, just not
> these custom ones that are being registered as they are triggering the
> notifyOfAddedMetric() function within the reporter itself.
>
> *Do you only see JobManager metrics, or is there somewhere also something
>> about the TaskManager?*
>
>
> It looks like there are metrics coming from both the JobManager and
> TaskManagers from the following examples that were coming out:
>
> localhost.jobmanager.numRegisteredTaskManagers
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
> localhost.jobmanager.Status.JVM.Memory.Direct.Count
>
> I do agree that a factory implementation with a static reporter would
> likely be a better approach, so I may explore that a bit more. As well as
> adding some changes to the existing, albeit ghetto, implementation for
> handling the dynamic metrics. I did see several references to a
> MetricRegistry class, however I wasn't sure if that was the most
> appropriate place to add this type of functionality or if it was needed at
> all.
>
> Thanks much,
>
> Rion
>
>
>
> On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler <[email protected]>
> wrote:
>
>> Was there anything in the logs (ideally on debug)?
>> Have you debugged the execution and followed the counter() calls all the
>> way to the reporter?
>> Do you only see JobManager metrics, or is there somewhere also something
>> about the TaskManager?
>>
>> I can see several issues with your code, but none that would fully
>> explain the issue:
>>
>> a) your reporter is not thread-safe
>> b) you only differentiate metrics by name, which will lead to quite a few
>> collisions.
>>
>> Be also aware that there will be 2 reporter instances; one for the JM and
>> one for the TM.
>> To remedy this, I would recommend creating a factory that returns a
>> static reporter instance instead; overall this tends to be cleaner.
>>
>> Alternatively, when using the testing harnesses IIRC you can also set set
>> a custom MetricGroup implementation.
>>
>> On 3/16/2021 4:13 AM, Rion Williams wrote:
>>
>> Hi all,
>>
>> Recently, I was working on adding some custom metrics to a Flink job that
>> required the use of dynamic labels (i.e. capturing various counters that
>> were "slicable" by things like tenant / source, etc.).
>>
>> I ended up handling it in a very naive fashion that would just keep a
>> dictionary of metrics that had already been registered and update them
>> accordingly which looked something like this:
>>
>> class MyCustomProcessFunction: ProcessFunction<Event, Unit>() {
>> private lateinit var metrics: CustomMetricsRegistry override fun
>> open(parameters: Configuration) {
>> metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
>> }
>>
>> override fun processElement(event: Event, context: Context, collector:
>> Collector<Unit>) {
>> // Insert calls like metrics.inc("tenant-name", 4) here }
>> }
>> class CustomMetricsRegistry(private val metricGroup: MetricGroup):
>> Serializable {
>> // Increments a given metric by key fun inc(metric: String, tenant:
>> String, amount: Long = 1) {
>> // Store a key for the metric val key = "$metric-$tenant"
>> // Store/register the metric if
>> (!registeredMetrics.containsKey(key)){
>> registeredMetrics[key] = metricGroup
>> .addGroup("tenant", tenant)
>> .counter(metric)
>> }
>>
>> // Update the metric by a given amount
>> registeredMetrics[key]!!.inc(amount)
>> }
>>
>> companion object {
>> private var registeredMetrics: HashMap<String, Counter> = hashMapOf()
>> }
>> }
>>
>> Basically registering and updating new metrics for tenants as they are
>> encountered, which I've seen being emitted as expected via hitting the
>> appropriately configured metrics endpoint (using a PrometheusReporter).
>>
>> However, while I was trying to write a few unit tests for this, I seemed
>> to encounter an issue. I was following a Stack Overflow post that was
>> answered by @Chesnay Schepler <[email protected]> [0] that described
>> the use of an in-memory/embedded Flink cluster and a custom reporter that
>> would statically expose the underlying metrics.
>>
>> So I took a shot at implementing something similar as follows:
>>
>> *Flink Cluster Definition*
>>
>> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
>> ConfigConstants.METRICS_REPORTER_PREFIX +
>> "MockCustomMetricsReporter." +
>> ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to
>> MockCustomMetricsReporter::class.java.name))
>> @ClassRule@JvmFieldval flinkCluster = MiniClusterResource(
>> MiniClusterResourceConfiguration.Builder()
>> .setConfiguration(metricsConfiguration)
>> .setNumberTaskManagers(1)
>> .setNumberSlotsPerTaskManager(1)
>> .build()
>> )
>>
>> *Custom Reporter*
>>
>> class MockCustomMetricsReporter : MetricReporter {
>>
>> override fun open(metricConfig: MetricConfig) {}
>> override fun close() {}
>> override fun notifyOfAddedMetric(metric: Metric, name: String,
>> metricGroup: MetricGroup) {
>> // Store the metrics that are being registered as we see them
>> if (!registeredCustomMetrics.containsKey(name)){
>> registeredCustomMetrics[name] = metric }
>> }
>>
>> override fun notifyOfRemovedMetric(metric: Metric, name: String,
>> metricGroup: MetricGroup) {
>> // Do nothing here }
>>
>> companion object {
>> // Static reference to metrics as they are registered var
>> registeredCustomMetrics = HashMap<String, Metric>()
>> }
>> }
>>
>> *Example Test*
>>
>> @Testfun `Example Metrics Use Case`(){
>> // Arrange val stream =
>> StreamExecutionEnvironment.getExecutionEnvironment()
>> val events = listOf(
>> eventWithUsers("tenant1", "[email protected]"),
>> eventWithUsers("tenant2", "[email protected]"),
>> )
>>
>> // Act stream
>> .fromCollection(events)
>> .process(MyCustomProcessFunction())
>>
>> // Assert stream.execute()
>> assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
>> }
>>
>> While this test will pass, *the problem is that the custom metrics
>> defined dynamically (via the CustomMetricsRegistry implementation) do not
>> appear within the registeredCustomMetrics collection*. In fact, there
>> are 21 metrics that get registered but all of them appear to be classic
>> out-of-the-box metrics such as CPU usage, number of task managers, load,
>> various other Netty and JVM stats, but no custom metrics are included.
>>
>> I've tried multiple different configurations, implementations via a
>> custom TestHarness, etc. but for some reason the custom metrics being
>> defined are never triggering the notifyOfAddedMetric function which
>> would be responsible for adding them to the static collection to be
>> asserted against.
>>
>> Any ideas / guidance would be more than welcome. Perhaps a different
>> approach? Based off examples I've encountered, the code seems like it
>> should "just work".
>>
>> Thanks much,
>>
>> Rion
>>
>> [0] :
>> https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
>>
>>
>>
>>
>