I guess PR to propose something for that would be a good start :). I I am fine - for now with forbidding it "as-is" and then maybe next step once we do it is to figure out a nicer way of doing it than Hooks. Maybe it can be done in parallel.
On Fri, Mar 1, 2024 at 9:19 PM Blain David <david.bl...@infrabel.be> wrote: > > It's certainly possible to check from where a python method is being called > using traceback. > > I do think prohibiting the execute method of an operator being called > manually would be a good idea, I've also came accross this in multiple DAG's > and this is ugly and looks like a hack. > Maybe in the beginning we could just print a warning message, but once a > minor or major release occurs, then we could raise an exception instead. > > We could refactor the BaseOperatorMeta class so we check from where the > execute method is being called. > > Maybe there are other ways to achieve the same but this is one of the > possible solutions I came up with. > > Below an example of how it could be done together with a unit test to verify > it's behaviour: > > import traceback > from abc import ABCMeta > from typing import Any > from unittest import TestCase > from unittest.mock import Mock, patch, MagicMock > > import pendulum > from airflow import AirflowException, DAG, settings > from airflow.models import TaskInstance, DagRun > from airflow.models.baseoperator import BaseOperatorMeta, BaseOperator > from airflow.utils.context import Context > from airflow.utils.state import DagRunState > from assertpy import assert_that > from mockito import mock, ANY, when, KWARGS > from sqlalchemy.orm import Session, Query > > > def executor_safeguard(): > def decorator(func): > def wrapper(*args, **kwargs): > # TODO: here we would need some kind of switch to detect if we > are in testing mode or not as > # we want to be able to execute the execute method of the > operators directly from within unit tests > caller_frame = traceback.extract_stack()[-2] # Get the caller > frame excluding the current frame > if caller_frame.name == "_execute_task" and "taskinstance" in > caller_frame.filename: > return func(*args, **kwargs) > raise AirflowException(f"Method {func.__name__} cannot be called > from {caller_frame.name}") > return wrapper > return decorator > > > def patched_base_operator_meta_new(cls, clsname, bases, clsdict): > execute_method = clsdict.get("execute") > if callable(execute_method) and not getattr(execute_method, > '__isabstractmethod__', False): > clsdict["execute"] = executor_safeguard()(execute_method) > return ABCMeta.__new__(cls, clsname, bases, clsdict) > > > # for demo purposes, I'm patching the __new__ magic method of BaseOperatorMeta > BaseOperatorMeta.__new__ = patched_base_operator_meta_new > > > class HelloWorldOperator(BaseOperator): > called = False > > def execute(self, context: Context) -> Any: > HelloWorldOperator.called = True > return f"Hello {self.owner}!" > > > class IterableSession(Session): > def __next__(self): > pass > > > class ExecutorSafeguardTestCase(TestCase): > > def test_executor_safeguard_when_unauthorized(self): > with self.assertRaises(AirflowException): > dag = DAG(dag_id="hello_world") > context = mock(spec=Context) > > HelloWorldOperator(task_id="task_id", > dag=dag).execute(context=context) > > @patch("sqlalchemy.orm.Session.__init__") > def test_executor_safeguard_when_authorized(self, mock_session: > MagicMock): > query = mock(spec=Query) > when(query).filter_by(**KWARGS).thenReturn(query) > when(query).filter(ANY).thenReturn(query) > when(query).scalar().thenAnswer(lambda: "dag_run_id") > when(query).delete() > session = mock(spec=IterableSession) > when(session).query(ANY).thenReturn(query) > when(session).scalar(ANY) > when(session).__iter__().thenAnswer(lambda: iter({})) > when(session).commit() > when(session).close() > when(session).execute(ANY) > when(session).add(ANY) > when(session).flush() > when(settings).Session().thenReturn(session) > > mock_session.return_value = session > > dag = DAG(dag_id="hello_world") > TaskInstance.get_task_instance = Mock(return_value=None) > when(TaskInstance).get_task_instance( > dag_id="hello_world", > task_id="hello_operator", > run_id="run_id", > map_index=-1, > select_columns=True, > lock_for_update=False, > session=session, > ).thenReturn(None) > > operator = HelloWorldOperator(task_id="hello_operator", dag=dag) > > assert_that(operator.called).is_false() > > task_instance = TaskInstance(task=operator, run_id="run_id") > task_instance.task_id = "hello_operator" > task_instance.dag_id = "hello_world" > task_instance.dag_run = DagRun(run_id="run_id", dag_id="hello_world", > execution_date=pendulum.now(), state=DagRunState.RUNNING) > task_instance._run_raw_task(test_mode=True, session=session) > > assert_that(operator.called).is_true() > > Maybe start a pull request for this one? What do you guys think? > > Kind regards, > David > > -----Original Message----- > From: Andrey Anshin <andrey.ans...@taragol.is> > Sent: Tuesday, 27 February 2024 14:36 > To: dev@airflow.apache.org > Subject: Re: Bad mixing of decorated and classic operators (users shooting > themselves in their foot) > > [You don't often get email from andrey.ans...@taragol.is. Learn why this is > important at https://aka.ms/LearnAboutSenderIdentification ] > > EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze niet > vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel, stuur > deze e-mail als bijlage naar ab...@infrabel.be<mailto:ab...@infrabel.be>. > > I can't see which problem is solved by allowing running one operator inside > another. > > *Propagate templated fields?* > In most cases it could be changed for a particular task or entire operator by > monkey patching, which in this case more safe rather than run operator in > operator. Or even available out of the box > > *Do an expensive operation for preparing parameters to the classical > operators?* > Well, just calculate it into the separate task flow operator and propagate > output into the classical operator, most of them do not expect to have a huge > input and types are primitives. > > I'm not sure that is possible to solve all things in top of the > Task/Operators, e.g.: > - Run it into the @task.docker, @task.kubernetes, @task.external, > @task.virtualenv > - Run deferrable operators > - Run rescheduling sensors > > Technically we could just run it into the separate executor inside of the > worker, I guess by the same way it worked in the past when for K8S and Celery > executors was an option Run Task. > > Another popular case it is run operator inside of callbacks, which are pretty > popular for the slack because there is some guidance from the Airflow 1.10.x > exists into the Internet even if we have Hooks and Notifiers for that, some > links from the first page of the Google on "airflow slack notifications" > request: > > *Bad Examples:* > - > https://towardsdatascience.com/automated-alerts-for-airflow-with-slack-5c6ec766a823 > - > https://www.reply.com/data-reply/en/content/integrating-slack-alerts-in-airflow > - > https://awstip.com/integrating-slack-with-airflow-step-by-step-guide-to-setup-alerts-1dc71d5e65ef > - https://naiveskill.com/airflow-slack-alert/ > - https://gist.github.com/kzzzr/a2a4152f6a7c03cd984e797c08ac702f > - > https://docs.astronomer.io/learn/error-notifications-in-airflow#legacy-slack-notifications-pre-26 > > *Good Examples:* > - > https://www.restack.io/docs/airflow-knowledge-apache-providers-slack-webhook-http-pypi-operator > - > https://docs.astronomer.io/learn/error-notifications-in-airflow?tab=taskflow#example-pre-built-notifier-slack > > I do not know how to force users not to use this approach but I guess it is > better to avoid "If you can't win it, lead it". > > > On Tue, 27 Feb 2024 at 13:48, Jarek Potiuk <ja...@potiuk.com> wrote: > > > Yeah. I kinda like (and see it emerging from the discussion) that we > > can (which I love) have cake and eat it too :). Means - I think we can > > have both 1) and 2) ... > > > > 1) We should raise an error if someone uses AnOperator in task context > > (the way TP described it would work nicely) - making calling the > > `execute` pattern directly wrong > > 2) MAYBE we can figure out a way to actually allow the users to use > > the logic that Bolke described in a nice, and "supported" way. I would > > actually love it, if we find an easy way to make the 3500+ operators > > we have - immediately available to our taskflow users. > > > > I don't particularly like the idea of having a run_bash, run_xxxx etc. > > for all the 3500+ operators we have. > > > > But how about just: > > > > result = run_operator(BashOperator(.,...)) > > > > If we pair it with the error from point 1) (and tell the user "Hey, if > > you want to do that you need to do it this way: run_operator(...") and > > implement appropriate pre/post processing in run_operator logic, I see > > no reason why it would not work.. > > Yes - it is slightly ugly, but only slightly - and it's entirely NOT > > magical and explicit. You would not even have to run `execute()` directly. > > Heck we could likely implement the run_operator() in the way to nicely > > work for sensors and deferrable operators and properly implement > > handling of all the cases I **think**. And properly design the rules > > on what happens with xcom and task ids (because the question is > > whether such xcom should belong to the parent task id or maybe we > > should have it for "internal" task_id etc. etc. > > > > I think that would be super-powerful, as TP wrote - operator would > > mostly fade away - but not disappear entirely > > > > J. > > > > > > > > On Tue, Feb 27, 2024 at 10:06 AM Andrey Anshin > > <andrey.ans...@taragol.is> > > wrote: > > > > > I think manually using *execute*, *poke*, *choose_branch* and etc. > > > should be considered as an antipattern, these methods only should be > > > invoked by Airflow Worker. > > > You never know what should be done before, especially if it about > > > deferrable operator or sensors in reschedule mode > > > > > > > > > > > > > > > On Tue, 27 Feb 2024 at 12:30, Bolke de Bruin <bdbr...@gmail.com> wrote: > > > > > > > Interesting! Jarek and I had a bit of a discussion on slack a > > > > couple of months ago in the context of Operators Need to Die (if I > > > > recall > > > correctly). > > > > > > > > The idea behind Operators is that you have a vetted way of > > > > executing a certain logic apart from the boilerplate (i.e. > > > > execute, post_execute) > > > that > > > > makes things work as a Task. A TaskFlow task only provides the > > > boilerplate. > > > > So if we can separate out the business logic from the boilerplate > > > > and generalize it as such so that it becomes useful in both > > > > TaskFlow and classic Operators it would allow for a more flexible > > > > pattern in > > TaskFlow > > > > and if required a more regulated / strict pattern with Operators > > > > and > > thus > > > > being backwards compatible. > > > > > > > > Jarek argued for snake case calling of those library functions > > > > then > > like > > > so > > > > (paraphrasing): > > > > > > > > @task(task_id='some_id', provide_context=True) def > > > > some_dummy_task(**context) -> int: > > > > ti = context['ti'] > > > > cmd2 = 'echo "pushing 2"' > > > > return run_bash(cmd=cmd2) > > > > > > > > There is little logic in the BashOperator that would prevent us > > > > from rewiring it so that it also calls out run_bash_command while > > > > keeping > > the > > > > same signature. > > > > > > > > Naming could be argued upon. It could be a standard pattern like: > > > > > > > > BashOperator -> run_bash() or just bash() EmailOperator -> > > > > run_email or email() > > > > > > > > I *think* I prefer a library pattern for this rather than hooks. > > > > Hooks > > > just > > > > don't feel entirely right. They are primarily there to manage > > > connections. > > > > In that way I can imagine that they register themselves upon usage > > > > for `on_kill` or something like that to ensure proper resource > > > > management. > > > > Maybe a HookMixin that gets autowired into the library function so > > > > that certain things get auto registered (Bash has the subprocess > > > > thing, this > > > can > > > > be generalized). > > > > > > > > Anyways with the above described pattern I like option 1 more > > > > because > > it > > > > distinguishes between opinionated / strict / classic and flexible. > > > > > > > > my 2 cents, > > > > Bolke > > > > > > > > > > > > On Tue, 27 Feb 2024 at 08:33, Tzu-ping Chung > > > > <t...@astronomer.io.invalid > > > > > > > wrote: > > > > > > > > > This kind of ties back to Bolke’s Operator Must Die > > > > > manifesto—you shouldn’t use the operator class here at all in > > > > > this situation, but > > the > > > > > corresponding hook instead. > > > > > > > > > > Regarding preventing this anti-pattern, I can think of two ways: > > > > > > > > > > Add a check in BaseOperator to detect whether __init__ is called > > > > > in a > > > > task > > > > > work context, and error out guiding users to use hooks instead. > > > > > Accept this as a valid pattern, and actually push to XCom. > > > > > > > > > > Personally I like options 1 more. Option 2 is admittedly less > > > disruptive, > > > > > but I’m pretty confident people will find other ways to use > > > > > operators > > > > that > > > > > do not work. Operators will never die, but they should fade > > > > > away. < > > > > > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%25 > > > > > 2Fen.wikipedia.org%2Fwiki%2FOld_soldiers_never_die&data=05%7C02% > > > > > 7Cdavid.blain%40infrabel.be%7Cab41cb8aafea4c6d566b08dc379a5a6e%7 > > > > > Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C638446383286767473%7 > > > > > CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBT > > > > > iI6Ik1haWwiLCJXVCI6Mn0%3D%7C0%7C%7C%7C&sdata=t6U8E35KX6embvSHILs > > > > > wwYDTOMN07o%2FiTmgGLvaVKjs%3D&reserved=0> > > > > > > > > > > TP > > > > > > > > > > > > > > > > On 27 Feb 2024, at 15:09, Jarek Potiuk <ja...@potiuk.com> wrote: > > > > > > > > > > > > Hello here, > > > > > > > > > > > > I have seen recently at least a few times that our users > > > > > > started to > > > > use a > > > > > > strange pattern > > > > > > > > > > > > @task(task_id='some_id', provide_context=True) def > > > > > > some_dummy_task(**context): > > > > > > ti = context['ti'] > > > > > > cmd2 = 'echo "pushing 2"' > > > > > > dbt_task = BashOperator(task_id='some_dummy_task_id', > > > > > > bash_command=cmd2, > > > > > > do_xcom_push=True) > > > > > > dbt_task.execute(context)` > > > > > > > > > > > > This **works** for them but then - for example - they complain > > > > > > that > > > > xcom > > > > > is > > > > > > not pushed (well of course it's not if you use operator and > > manually > > > > run > > > > > > execute). > > > > > > > > > > > > Now - while this is a bad pattern - obviously - it seems that > > > > > > our > > > users > > > > > > **think** they can do itl. And maybe we can do something to > > > > > > prevent > > > > them > > > > > > shooting themselves in their foot? > > > > > > > > > > > > I do not know that well the @task decoration magic, but maybe > > > > > > we > > > could > > > > > > somehow detect the case where someone instantiates the > > > > > > operator > > (and > > > > > runs > > > > > > execute) inside a decorated task and give a helpful error in > > > > > > this > > > > case? I > > > > > > am afraid people will start using it more and more and the > > > > > > sooner > > we > > > > add > > > > > > protection against it, the better chance we have to contain it. > > > > > > > > > > > > J. > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- > > > > Bolke de Bruin > > > > bdbr...@gmail.com > > > > > > > > > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org For additional commands, e-mail: dev-h...@airflow.apache.org