If this is a common pattern, then I think it's worth considering the level of effort to bake this into airflow, both for developer convenience and safety. If it's baked into airflow, a developer working on dags in the cluster will be saved from accidentally overriding important callbacks that feed into observability.
What about extending the callback system so that a user instantiating an operator can decide whether to override callbacks from the dag/base operator or append to a list of callbacks? On Sun, Mar 31, 2019, 5:12 PM Brian Greene <br...@heisenbergwoodworking.com> wrote: > My answer is like his. > > It still requires all devs making dags to use the same mechanism, but it’s > reaaaaaly easy (assuming you’re not already using those hooks). > > We have a single python utility function that is set as > “on_success_callback” AND for failure. Set it in “default_args” once for > the dag and the entire thing is “instrumented” somewhat well without any > other code change. > > It grabs task metadata, grabs dagrun.conf (super useful if you do a lot of > triggered dags where the dag run is your “param” structure). > > We took the path of completely leaving airflow for the rest of the > work... that payload gets sent to a single API endpoint. Said endpoint > has a lib that does “things” with the messages... logs in dynamo for > starters, some carry on to slack notifications, most get passed to the > logging/evening service after some cleanup... > > You can write a whole little handler thingy to handle all the messages and > get whatever you want done, with almost no code in airflow or the dags. > > B > > Sent from a device with less than stellar autocorrect > > > On Mar 31, 2019, at 3:29 PM, Stachurski, Stephan < > stephan.stachur...@nytimes.com> wrote: > > > > Hi- > > > > I'm pretty new to airflow, and I'm trying to work on getting > > visibility/observability into what airflow is doing. > > > > I would like to be able to observe things about dag runs and task > > instances. I would like to be able to send metrics to time series > databases > > (possibly extending the existing airflow metrics exported in statsd > > format). I would like to be able to label these data points with things > > like the dag run id. Let's say that every time a task succeeded, was > > retried, or failed, it sent a datapoint where the value was the duration > of > > the task, and it was labeled with dag run id. Then I could plot these > > datapoints in a stacked bar graph, where each stack includes the total > > duration of all tasks in a dag run, the height is the total duration, > and I > > could analyze whether my dag runs are getting faster or slower over time. > > > > I would like information from airflow state to be able to label metric > > datapoints. I would like to be able to apply my own labels (perhaps via > > callback that could be executed at the time the metric is published, and > > given context parameter(s)). > > > > I believe I can do this today, but it requires quite a bit of work on my > > end, and all developers working on my team/cluster will sort of need to > be > > onboarded with a standard way of doing things, otherwise not everything > > running in the cluster will be observed the same way. > > > > I could try adding callbacks to all my operators. But there are some > > problems here. When the on_success_callback runs, I know when the task > > started, but I have to kind of infer the total duration by taking the > > difference between the task start time and "now" in the middle of the > > callback. Also it's kind of tedious and error-prone to make sure that > these > > callbacks are actually used everywhere. > > > > I could use custom operators, which is slightly less tedious than > > augmenting every operator instance, but still not as elegant to me as the > > idea of airflow offering task, operator, and/or dag middleware extension > > points. > > > > An alternative way of approaching the problem would be if airflow fired > > webhooks, corresponding to events like dag runs starting, ending, being > > cleared, or tasks starting, being retried, etc. Think of github firing > > webhooks when branches are updated, PRs opened, updated, closed, etc, and > > CI/CD systems loosely coupled to github webhooks that run builds and > deploy > > code. If you could configure airflow to make a request to a webhook > > endpoint whenever something happened, you could include a bunch of > relevant > > state in the payload. The listener could use the info in the payload to > > build the metrics I need, and more. The listener could also make further > > requests to the airflow api if necessary. > > > > I haven't really explored how the perspective on the problem changes if I > > wanted to use pull style metrics instead of push style. For example, if I > > wanted to get the same graph I described above, except from prometheus > > scraping airflow, then maybe I wouldn't need middleware or hooks. If > > airflow fires webhooks, then I need a webhook listener to interpret these > > hooks and get data into my monitoring system. If airflow provides > > middleware extension points, then I need to stick my custom code directly > > within airflow, which sort of introduces a coupling between airflow and > the > > monitoring system. If I provide a metrics endpoint for something like > > prometheus to scrape, then additional logic or actions I would have stuck > > on my webhook listener now has to plug into or monitor prometheus > instead. > > > > I'm putting this out to the list for two reasons: > > > > Maybe someone can suggest a simple solution where I can write a small > > amount of code in one place to glue things together, and not depend on > devs > > on my team always reaching for the correct custom operator if they're > used > > to using the standard ones. > > > > If there is no satisfactory solution, then what's the right way to evolve > > airflow so that there is a satisfactory solution? > > > > For additional context I asked about this on slack already: > > > > https://apache-airflow.slack.com/archives/CCR6P6JRL/p1551743461193800 > > https://apache-airflow.slack.com/archives/CCR6P6JRL/p1553171296597100 > > > > I have to admit that I don't fully understand what @bosnjak was getting > at > > here, maybe this is really the answer to all my problems: > > > >> bosnjak [10 days ago] > > if you want to handle them differently depending on the task, you should > > have a single callback handler that can route the calls depending on the > > context, which is passed to the handler by default. You can access task > > instance and other stuff from there. >