Thank you for your answers! Fair points.
Would you elaborate on what you are expecting the behaviour to look like?
Ideally your runner would export gauges at a periodic interval.
I'd expect all gauge values reported by the SDK to also be reported by
the Runner. Instead of reducing the gauge values immediately, we need to
buffer them until they are reported. Afterwards we can go on to reduce
them like we currently do.
Turns out that this poses some challenges on the Runner side because
metrics in many Runners, e.g. Flink, work on a "pull" basis, so there is
no way to proactively push the metrics, we have to wait for them to be
queried. I suppose the current approach suffices because, without the
ability to push, knowing when to cleanup is impossible. There is not
even a callback in Flink to know when the metrics have been pulled.
There is also the case where a user might generate a ridiculous amount
of gauge values, which is also not ideal. Clocks could also be a problem
if we rotated the SDK Harness.
Max, do you have a use case in mind?
I believe this is necessary to correctly observe metrics. Imagine the
number of elements processed metric fluctuates heavily, e.g. due to a
hotspot in the keying. This "spike" would just be cancelled out by a
subsequent reporting.
Btw, my use case if reporting state cache hits/misses on a per-bundle basis.
Thanks,
Max
On 15.10.19 20:28, Pablo Estrada wrote:
There are cases where we may want to update a Gauge frequently.
Especially if it's from a single worker. The example I always think of
is progress in a Kafka partition. This is consumed by a single worker at
a time, and updated by a single worker at a time - and frequently
updated too.
Max, do you have a use case in mind? I think what you propose is
reasonable, but I can't think of a use case that requires updates that
are much more frequent than a few seconds?
I also worry about increasing the size of the SDK updates to a
potentially huge amount (what if someone writes a DoFn that updates a
gauge on every element?)
On Tue, Oct 15, 2019 at 10:23 AM Alex Amato <ajam...@google.com
<mailto:ajam...@google.com>> wrote:
Would you elaborate on what you are expecting the behaviour to look
like? Ideally your runner would export gauges at a periodic interval.
The design of gauge is inherently unable to handle multiple updates
to it around the same time.
Consider the case of multiple machines reporting the gauge at the
same time. You can pick the one with the largest timestamp on each
machine. Then when reported to a central metric service, it cannot
compare timestamps in a meaningful way, since they come from
different machines with out of sync clocks. Racy threads can be an
issue as well (multiple bundles reporting separate values for the
gauge, the order is arbitrary based on thread execution order even
though on the same machine)
The current thinking around this IIRC, is to try and document this
and make this clear in the usage of gauge:
1. Gauges should only be used for values which are updated
infrequently.
2. Different gauge values reported from different workers near the
same time cannot be reliably aggregated together into a single,
"most recent" value.
On Tue, Oct 15, 2019 at 9:55 AM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
Hi,
While adding metrics for the Python state cache [1], I was
wondering
about the story of Gauges in Beam. It seems like we only keep a
value at
a time and use a combiner [2] that replaces an old, possibly not
reported gauge result, with a newer gauge result based on their
timestamps.
This behavior is an issue because if the SDK reports faster than
the
Runner queries, metrics will just be swallowed. Gauges seem
important to
get right because often users want to see all the values, e.g.
in case
of spikes in the data.
What do you think about keeping all gauge values until they are
reported?
Thanks,
Max
[1] https://github.com/apache/beam/pull/9769
[2]
https://github.com/apache/beam/blob/fa74467b82e78962e9f170ad0d95fa6b345add67/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java#L134