Thank you for your answers! Fair points.

Would you elaborate on what you are expecting the behaviour to look like? 
Ideally your runner would export gauges at a periodic interval.

I'd expect all gauge values reported by the SDK to also be reported by the Runner. Instead of reducing the gauge values immediately, we need to buffer them until they are reported. Afterwards we can go on to reduce them like we currently do.

Turns out that this poses some challenges on the Runner side because metrics in many Runners, e.g. Flink, work on a "pull" basis, so there is no way to proactively push the metrics, we have to wait for them to be queried. I suppose the current approach suffices because, without the ability to push, knowing when to cleanup is impossible. There is not even a callback in Flink to know when the metrics have been pulled.

There is also the case where a user might generate a ridiculous amount of gauge values, which is also not ideal. Clocks could also be a problem if we rotated the SDK Harness.

Max, do you have a use case in mind?

I believe this is necessary to correctly observe metrics. Imagine the number of elements processed metric fluctuates heavily, e.g. due to a hotspot in the keying. This "spike" would just be cancelled out by a subsequent reporting.

Btw, my use case if reporting state cache hits/misses on a per-bundle basis.

Thanks,
Max

On 15.10.19 20:28, Pablo Estrada wrote:
There are cases where we may want to update a Gauge frequently. Especially if it's from a single worker. The example I always think of is progress in a Kafka partition. This is consumed by a single worker at a time, and updated by a single worker at a time - and frequently updated too.

Max, do you have a use case in mind? I think what you propose is reasonable, but I can't think of a use case that requires updates that are much more frequent than a few seconds? I also worry about increasing the size of the SDK updates to a potentially huge amount (what if someone writes a DoFn that updates a gauge on every element?)

On Tue, Oct 15, 2019 at 10:23 AM Alex Amato <ajam...@google.com <mailto:ajam...@google.com>> wrote:

    Would you elaborate on what you are expecting the behaviour to look
    like? Ideally your runner would export gauges at a periodic interval.

    The design of gauge is inherently unable to handle multiple updates
    to it around the same time.

    Consider the case of multiple machines reporting the gauge at the
    same time. You can pick the one with the largest timestamp on each
    machine. Then when reported to a central metric service, it cannot
    compare timestamps in a meaningful way, since they come from
    different machines with out of sync clocks. Racy threads can be an
    issue as well (multiple bundles reporting separate values for the
    gauge, the order is arbitrary based on thread execution order even
    though on the same machine)

    The current thinking around this IIRC, is to try and document this
    and make this clear in the usage of gauge:

     1. Gauges should only be used for values which are updated
        infrequently.
     2. Different gauge values reported from different workers near the
        same time cannot be reliably aggregated together into a single,
        "most recent" value.






    On Tue, Oct 15, 2019 at 9:55 AM Maximilian Michels <m...@apache.org
    <mailto:m...@apache.org>> wrote:

        Hi,

        While adding metrics for the Python state cache [1], I was
        wondering
        about the story of Gauges in Beam. It seems like we only keep a
        value at
        a time and use a combiner [2] that replaces an old, possibly not
        reported gauge result, with a newer gauge result based on their
        timestamps.

        This behavior is an issue because if the SDK reports faster than
        the
        Runner queries, metrics will just be swallowed. Gauges seem
        important to
        get right because often users want to see all the values, e.g.
        in case
        of spikes in the data.

        What do you think about keeping all gauge values until they are
        reported?

        Thanks,
        Max

        [1] https://github.com/apache/beam/pull/9769
        [2]
        
https://github.com/apache/beam/blob/fa74467b82e78962e9f170ad0d95fa6b345add67/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java#L134

Reply via email to