Is there a way to leverage runners' existing metrics sinks? As stated by Amit & Stas, Spark runner uses Spark's metrics sink to report Beam's aggregators and metrics. Other runners may also have a similar capability, I'm not sure. This could remove the need for a plugin, and dealing with push/pull. I'm assuming we should compile a table of what can be supported in each runner in this area and then decide a way to move forward?
On Sat, Feb 18, 2017 at 6:35 PM Jean-Baptiste Onofré <[email protected]> wrote: > Good point. > > In Decanter, it's what I named a "scheduled collector". So, yes, the > adapter will periodically harvest metric to push. > > Regards > JB > > On 02/18/2017 05:30 PM, Amit Sela wrote: > > First issue with "push" metrics plugin - what if the runner's underlying > > reporting mechanism is "pull" ? Codahale ScheduledReporter will sample > the > > values every X and send to ... > > So any runner using a "pull-like" would use an adapter ? > > > > On Sat, Feb 18, 2017 at 6:27 PM Jean-Baptiste Onofré <[email protected]> > > wrote: > > > >> Hi Ben, > >> > >> ok it's what I thought. Thanks for the clarification. > >> > >> +1 for the plugin-like "push" API (it's what I have in mind too ;)). > >> I will start a PoC for discussion next week. > >> > >> Regards > >> JB > >> > >> On 02/18/2017 05:17 PM, Ben Chambers wrote: > >>> The runner can already report metrics during pipeline execution so it > is > >>> usable for monitoring. > >>> > >>> The pipeline result can be used to query metrics during pipeline > >> execution, > >>> so a first version of reporting to other systems is to periodically > pulls > >>> metrics from the runner with that API. > >>> > >>> We may eventually want to provide a plugin-like API to get the runner > to > >>> push metrics more directly to other metrics stores. This layer needs > some > >>> thought since it has to handle the complexity of attempted/committed > >>> metrics to be consistent with the model. > >>> > >>> > >>> > >>> On Sat, Feb 18, 2017, 5:44 AM Jean-Baptiste Onofré <[email protected]> > >> wrote: > >>> > >>> Hi Amit, > >>> > >>> before Beam, I didn't mind about portability ;) So I used the Spark > >>> approach. > >>> > >>> But, now, as a Beam user, I would expect a generic way to deal with > >>> metric whatever the runner would be. > >>> > >>> Today, you are right: I'm using the solution provided by the execution > >>> engine. That's the current approach and it works fine. And it's up to > me > >>> to leverage (for intance Accumulators) it with my own system. > >>> > >>> My thought is more to provide a generic way. It's only a discussion for > >>> now ;) > >>> > >>> Regards > >>> JB > >>> > >>> On 02/18/2017 02:38 PM, Amit Sela wrote: > >>>> On Sat, Feb 18, 2017 at 10:16 AM Jean-Baptiste Onofré < > [email protected]> > >>>> wrote: > >>>> > >>>>> Hi Amit, > >>>>> > >>>>> my point is: how do we provide metric today to end user and how can > >> they > >>>>> use it to monitor a running pipeline ? > >>>>> > >>>>> Clearly the runner is involved, but, it should behave the same way > for > >>>>> all runners. Let me take an example. > >>>>> On my ecosystem, I'm using both Flink and Spark with Beam, some > >>>>> pipelines on each. I would like to get the metrics for all pipelines > to > >>>>> my monitoring backend. If I can "poll" from the execution engine > metric > >>>>> backend to my system that's acceptable, but it's an overhead of work. > >>>>> Having a generic metric reporting layer would allow us to have a more > >>>>> common way. If the user doesn't provide any reporting sink, then we > use > >>>>> the execution backend metric layer. If provided, we use the reporting > >>> sink. > >>>>> > >>>> How did you do it before Beam ? I that for Spark you reported it's > >> native > >>>> metrics via Codahale Reporter and Accumulators were visible in the UI, > >> and > >>>> the Spark runner took it a step forward to make it all visible via > >>>> Codahale. Assuming Flink does something similar, it all belongs to > >> runner > >>>> setup/configuration. > >>>> > >>>>> > >>>>> About your question: you are right, it's possible to update a > collector > >>>>> or appender without impacting anything else. > >>>>> > >>>>> Regards > >>>>> JB > >>>>> > >>>>> On 02/17/2017 10:38 PM, Amit Sela wrote: > >>>>>> @JB I think what you're suggesting is that Beam should provide a > >>> "Metrics > >>>>>> Reporting" API as well, and I used to think like you, but the more I > >>>>>> thought of that the more I tend to disagree now. > >>>>>> > >>>>>> The SDK is for users to author pipelines, so Metrics are for > >>> user-defined > >>>>>> metrics (in contrast to runner metrics). > >>>>>> > >>>>>> The Runner API is supposed to help different backends to integrate > >> with > >>>>>> Beam to allow users to execute those pipeline on their favourite > >>>>> backend. I > >>>>>> believe the Runner API has to provide restrictions/demands that are > >> just > >>>>>> enough so a runner could execute a Beam pipeline as best it can, and > >> I'm > >>>>>> afraid that this would demand runner authors to do work that is > >>>>> unnecessary. > >>>>>> This is also sort of "crossing the line" into the runner's domain > and > >>>>>> "telling it how to do" instead of what, and I don't think we want > >> that. > >>>>>> > >>>>>> I do believe however that runner's should integrate the Metrics into > >>>>> their > >>>>>> own metrics reporting system - but that's for the runner author to > >>>>> decide. > >>>>>> Stas did this for the Spark runner because Spark doesn't report back > >>>>>> user-defined Accumulators (Spark's Aggregators) to it's Metrics > >> system. > >>>>>> > >>>>>> On a curious note though, did you use an OSGi service per > event-type ? > >>> so > >>>>>> you can upgrade specific event-handlers without taking down the > entire > >>>>>> reporter ? but that's really unrelated to this thread :-) . > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Fri, Feb 17, 2017 at 8:36 PM Ben Chambers > >>>>> <[email protected]> > >>>>>> wrote: > >>>>>> > >>>>>>> It don't think it is possible for there to be a general mechanism > for > >>>>>>> pushing metrics out during the execution of a pipeline. The Metrics > >> API > >>>>>>> suggests that metrics should be reported as values across all > >> attempts > >>>>> and > >>>>>>> values across only successful attempts. The latter requires runner > >>>>>>> involvement to ensure that a given metric value is atomically > >>>>> incremented > >>>>>>> (or checkpointed) when the bundle it was reported in is committed. > >>>>>>> > >>>>>>> Aviem has already implemented Metrics support for the Spark > runner. I > >>> am > >>>>>>> working on support for the Dataflow runner. > >>>>>>> > >>>>>>> On Fri, Feb 17, 2017 at 7:50 AM Jean-Baptiste Onofré < > >> [email protected]> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi guys, > >>>>>>> > >>>>>>> As I'm back from vacation, I'm back on this topic ;) > >>>>>>> > >>>>>>> It's a great discussion, and I think about the Metric IO coverage, > >> it's > >>>>>>> good. > >>>>>>> > >>>>>>> However, there's a point that we discussed very fast in the thread > >> and > >>> I > >>>>>>> think it's an important one (maybe more important than the provided > >>>>>>> metrics actually in term of roadmap ;)). > >>>>>>> > >>>>>>> Assuming we have pipelines, PTransforms, IOs, ... using the Metric > >> API, > >>>>>>> how do we expose the metrics for the end-users ? > >>>>>>> > >>>>>>> A first approach would be to bind a JMX MBean server by the > pipeline > >>> and > >>>>>>> expose the metrics via MBeans. I don't think it's a good idea for > the > >>>>>>> following reasons: > >>>>>>> 1. It's not easy to know where the pipeline is actually executed, > and > >>>>>>> so, not easy to find the MBean server URI. > >>>>>>> 2. For the same reason, we can have port binding error. > >>>>>>> 3. If it could work for unbounded/streaming pipelines (as they are > >>>>>>> always "running"), it's not really applicable for bounded/batch > >>>>>>> pipelines as their lifetime is "limited" ;) > >>>>>>> > >>>>>>> So, I think the "push" approach is better: during the execution, a > >>>>>>> pipeline "internally" collects and pushes the metric to a backend. > >>>>>>> The "push" could a kind of sink. For instance, the metric "records" > >> can > >>>>>>> be sent to a Kafka topic, or directly to Elasticsearch or whatever. > >>>>>>> The metric backend can deal with alerting, reporting, etc. > >>>>>>> > >>>>>>> Basically, we have to define two things: > >>>>>>> 1. The "appender" where the metrics have to be sent (and the > >>>>>>> corresponding configuration to connect, like Kafka or Elasticsearch > >>>>>>> location) > >>>>>>> 2. The format of the metric data (for instance, json format). > >>>>>>> > >>>>>>> In Apache Karaf, I created something similar named Decanter: > >>>>>>> > >>>>>>> > >>>>>>> > >>>>> > >>> > >> > http://blog.nanthrax.net/2015/07/monitoring-and-alerting-with-apache-karaf-decanter/ > >>>>>>> > >>>>>>> http://karaf.apache.org/manual/decanter/latest-1/ > >>>>>>> > >>>>>>> Decanter provides collectors that harvest the metrics (like JMX > MBean > >>>>>>> attributes, log messages, ...). Basically, for Beam, it would be > >>>>>>> directly the Metric API used by pipeline parts. > >>>>>>> Then, the metric record are send to a dispatcher which send the > >> metric > >>>>>>> records to an appender. The appenders store or send the metric > >> records > >>>>>>> to a backend (elasticsearc, cassandra, kafka, jms, reddis, ...). > >>>>>>> > >>>>>>> I think it would make sense to provide the configuration and Metric > >>>>>>> "appender" via the pipeline options. > >>>>>>> As it's not really runner specific, it could be part of the metric > >> API > >>>>>>> (or SPI in that case). > >>>>>>> > >>>>>>> WDYT ? > >>>>>>> > >>>>>>> Regards > >>>>>>> JB > >>>>>>> > >>>>>>> On 02/15/2017 09:22 AM, Stas Levin wrote: > >>>>>>>> +1 to making the IO metrics (e.g. producers, consumers) available > as > >>>>> part > >>>>>>>> of the Beam pipeline metrics tree for debugging and visibility. > >>>>>>>> > >>>>>>>> As it has already been mentioned, many IO clients have a metrics > >>>>>>> mechanism > >>>>>>>> in place, so in these cases I think it could be beneficial to > mirror > >>>>>>> their > >>>>>>>> metrics under the relevant subtree of the Beam metrics tree. > >>>>>>>> > >>>>>>>> On Wed, Feb 15, 2017 at 12:04 AM Amit Sela <[email protected]> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> I think this is a great discussion and I'd like to relate to some > >> of > >>>>> the > >>>>>>>>> points raised here, and raise some of my own. > >>>>>>>>> > >>>>>>>>> First of all I think we should be careful here not to cross > >>>>> boundaries. > >>>>>>> IOs > >>>>>>>>> naturally have many metrics, and Beam should avoid "taking over" > >>>>> those. > >>>>>>> IO > >>>>>>>>> metrics should focus on what's relevant to the Pipeline: > >> input/output > >>>>>>> rate, > >>>>>>>>> backlog (for UnboundedSources, which exists in bytes but for > >>>>> monitoring > >>>>>>>>> purposes we might want to consider #messages). > >>>>>>>>> > >>>>>>>>> I don't agree that we should not invest in doing this in > >>> Sources/Sinks > >>>>>>> and > >>>>>>>>> going directly to SplittableDoFn because the IO API is familiar > and > >>>>>>> known, > >>>>>>>>> and as long as we keep it should be treated as a first class > >> citizen. > >>>>>>>>> > >>>>>>>>> As for enable/disable - if IOs consider focusing on > >> pipeline-related > >>>>>>>>> metrics I think we should be fine, though this could also change > >>>>> between > >>>>>>>>> runners as well. > >>>>>>>>> > >>>>>>>>> Finally, considering "split-metrics" is interesting because on > one > >>>>> hand > >>>>>>> it > >>>>>>>>> affects the pipeline directly (unbalanced partitions in Kafka > that > >>> may > >>>>>>>>> cause backlog) but this is that fine-line of responsibilities > >> (Kafka > >>>>>>>>> monitoring would probably be able to tell you that partitions are > >> not > >>>>>>>>> balanced). > >>>>>>>>> > >>>>>>>>> My 2 cents, cheers! > >>>>>>>>> > >>>>>>>>> On Tue, Feb 14, 2017 at 8:46 PM Raghu Angadi > >>>>> <[email protected] > >>>>>>>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> On Tue, Feb 14, 2017 at 9:21 AM, Ben Chambers > >>>>>>>>> <[email protected] > >>>>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> * I also think there are data source specific metrics that a > >> given > >>>>> IO > >>>>>>>>>>> will > >>>>>>>>>>>> want to expose (ie, things like kafka backlog for a topic.) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> UnboundedSource has API for backlog. It is better for > beam/runners > >>> to > >>>>>>>>>> handle backlog as well. > >>>>>>>>>> Of course there will be some source specific metrics too > (errors, > >>> i/o > >>>>>>> ops > >>>>>>>>>> etc). > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Jean-Baptiste Onofré > >>>>>>> [email protected] > >>>>>>> http://blog.nanthrax.net > >>>>>>> Talend - http://www.talend.com > >>>>>>> > >>>>>> > >>>>> > >>>>> -- > >>>>> Jean-Baptiste Onofré > >>>>> [email protected] > >>>>> http://blog.nanthrax.net > >>>>> Talend - http://www.talend.com > >>>>> > >>>> > >>> > >>> -- > >>> Jean-Baptiste Onofré > >>> [email protected] > >>> http://blog.nanthrax.net > >>> Talend - http://www.talend.com > >>> > >> > >> -- > >> Jean-Baptiste Onofré > >> [email protected] > >> http://blog.nanthrax.net > >> Talend - http://www.talend.com > >> > > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
