It's certainly possible to check from where a python method is being called using traceback.
I do think prohibiting the execute method of an operator being called manually would be a good idea, I've also came accross this in multiple DAG's and this is ugly and looks like a hack. Maybe in the beginning we could just print a warning message, but once a minor or major release occurs, then we could raise an exception instead. We could refactor the BaseOperatorMeta class so we check from where the execute method is being called. Maybe there are other ways to achieve the same but this is one of the possible solutions I came up with. Below an example of how it could be done together with a unit test to verify it's behaviour: import traceback from abc import ABCMeta from typing import Any from unittest import TestCase from unittest.mock import Mock, patch, MagicMock import pendulum from airflow import AirflowException, DAG, settings from airflow.models import TaskInstance, DagRun from airflow.models.baseoperator import BaseOperatorMeta, BaseOperator from airflow.utils.context import Context from airflow.utils.state import DagRunState from assertpy import assert_that from mockito import mock, ANY, when, KWARGS from sqlalchemy.orm import Session, Query def executor_safeguard(): def decorator(func): def wrapper(*args, **kwargs): # TODO: here we would need some kind of switch to detect if we are in testing mode or not as # we want to be able to execute the execute method of the operators directly from within unit tests caller_frame = traceback.extract_stack()[-2] # Get the caller frame excluding the current frame if caller_frame.name == "_execute_task" and "taskinstance" in caller_frame.filename: return func(*args, **kwargs) raise AirflowException(f"Method {func.__name__} cannot be called from {caller_frame.name}") return wrapper return decorator def patched_base_operator_meta_new(cls, clsname, bases, clsdict): execute_method = clsdict.get("execute") if callable(execute_method) and not getattr(execute_method, '__isabstractmethod__', False): clsdict["execute"] = executor_safeguard()(execute_method) return ABCMeta.__new__(cls, clsname, bases, clsdict) # for demo purposes, I'm patching the __new__ magic method of BaseOperatorMeta BaseOperatorMeta.__new__ = patched_base_operator_meta_new class HelloWorldOperator(BaseOperator): called = False def execute(self, context: Context) -> Any: HelloWorldOperator.called = True return f"Hello {self.owner}!" class IterableSession(Session): def __next__(self): pass class ExecutorSafeguardTestCase(TestCase): def test_executor_safeguard_when_unauthorized(self): with self.assertRaises(AirflowException): dag = DAG(dag_id="hello_world") context = mock(spec=Context) HelloWorldOperator(task_id="task_id", dag=dag).execute(context=context) @patch("sqlalchemy.orm.Session.__init__") def test_executor_safeguard_when_authorized(self, mock_session: MagicMock): query = mock(spec=Query) when(query).filter_by(**KWARGS).thenReturn(query) when(query).filter(ANY).thenReturn(query) when(query).scalar().thenAnswer(lambda: "dag_run_id") when(query).delete() session = mock(spec=IterableSession) when(session).query(ANY).thenReturn(query) when(session).scalar(ANY) when(session).__iter__().thenAnswer(lambda: iter({})) when(session).commit() when(session).close() when(session).execute(ANY) when(session).add(ANY) when(session).flush() when(settings).Session().thenReturn(session) mock_session.return_value = session dag = DAG(dag_id="hello_world") TaskInstance.get_task_instance = Mock(return_value=None) when(TaskInstance).get_task_instance( dag_id="hello_world", task_id="hello_operator", run_id="run_id", map_index=-1, select_columns=True, lock_for_update=False, session=session, ).thenReturn(None) operator = HelloWorldOperator(task_id="hello_operator", dag=dag) assert_that(operator.called).is_false() task_instance = TaskInstance(task=operator, run_id="run_id") task_instance.task_id = "hello_operator" task_instance.dag_id = "hello_world" task_instance.dag_run = DagRun(run_id="run_id", dag_id="hello_world", execution_date=pendulum.now(), state=DagRunState.RUNNING) task_instance._run_raw_task(test_mode=True, session=session) assert_that(operator.called).is_true() Maybe start a pull request for this one? What do you guys think? Kind regards, David -----Original Message----- From: Andrey Anshin <andrey.ans...@taragol.is> Sent: Tuesday, 27 February 2024 14:36 To: dev@airflow.apache.org Subject: Re: Bad mixing of decorated and classic operators (users shooting themselves in their foot) [You don't often get email from andrey.ans...@taragol.is. Learn why this is important at https://aka.ms/LearnAboutSenderIdentification ] EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze niet vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel, stuur deze e-mail als bijlage naar ab...@infrabel.be<mailto:ab...@infrabel.be>. I can't see which problem is solved by allowing running one operator inside another. *Propagate templated fields?* In most cases it could be changed for a particular task or entire operator by monkey patching, which in this case more safe rather than run operator in operator. Or even available out of the box *Do an expensive operation for preparing parameters to the classical operators?* Well, just calculate it into the separate task flow operator and propagate output into the classical operator, most of them do not expect to have a huge input and types are primitives. I'm not sure that is possible to solve all things in top of the Task/Operators, e.g.: - Run it into the @task.docker, @task.kubernetes, @task.external, @task.virtualenv - Run deferrable operators - Run rescheduling sensors Technically we could just run it into the separate executor inside of the worker, I guess by the same way it worked in the past when for K8S and Celery executors was an option Run Task. Another popular case it is run operator inside of callbacks, which are pretty popular for the slack because there is some guidance from the Airflow 1.10.x exists into the Internet even if we have Hooks and Notifiers for that, some links from the first page of the Google on "airflow slack notifications" request: *Bad Examples:* - https://towardsdatascience.com/automated-alerts-for-airflow-with-slack-5c6ec766a823 - https://www.reply.com/data-reply/en/content/integrating-slack-alerts-in-airflow - https://awstip.com/integrating-slack-with-airflow-step-by-step-guide-to-setup-alerts-1dc71d5e65ef - https://naiveskill.com/airflow-slack-alert/ - https://gist.github.com/kzzzr/a2a4152f6a7c03cd984e797c08ac702f - https://docs.astronomer.io/learn/error-notifications-in-airflow#legacy-slack-notifications-pre-26 *Good Examples:* - https://www.restack.io/docs/airflow-knowledge-apache-providers-slack-webhook-http-pypi-operator - https://docs.astronomer.io/learn/error-notifications-in-airflow?tab=taskflow#example-pre-built-notifier-slack I do not know how to force users not to use this approach but I guess it is better to avoid "If you can't win it, lead it". On Tue, 27 Feb 2024 at 13:48, Jarek Potiuk <ja...@potiuk.com> wrote: > Yeah. I kinda like (and see it emerging from the discussion) that we > can (which I love) have cake and eat it too :). Means - I think we can > have both 1) and 2) ... > > 1) We should raise an error if someone uses AnOperator in task context > (the way TP described it would work nicely) - making calling the > `execute` pattern directly wrong > 2) MAYBE we can figure out a way to actually allow the users to use > the logic that Bolke described in a nice, and "supported" way. I would > actually love it, if we find an easy way to make the 3500+ operators > we have - immediately available to our taskflow users. > > I don't particularly like the idea of having a run_bash, run_xxxx etc. > for all the 3500+ operators we have. > > But how about just: > > result = run_operator(BashOperator(.,...)) > > If we pair it with the error from point 1) (and tell the user "Hey, if > you want to do that you need to do it this way: run_operator(...") and > implement appropriate pre/post processing in run_operator logic, I see > no reason why it would not work.. > Yes - it is slightly ugly, but only slightly - and it's entirely NOT > magical and explicit. You would not even have to run `execute()` directly. > Heck we could likely implement the run_operator() in the way to nicely > work for sensors and deferrable operators and properly implement > handling of all the cases I **think**. And properly design the rules > on what happens with xcom and task ids (because the question is > whether such xcom should belong to the parent task id or maybe we > should have it for "internal" task_id etc. etc. > > I think that would be super-powerful, as TP wrote - operator would > mostly fade away - but not disappear entirely > > J. > > > > On Tue, Feb 27, 2024 at 10:06 AM Andrey Anshin > <andrey.ans...@taragol.is> > wrote: > > > I think manually using *execute*, *poke*, *choose_branch* and etc. > > should be considered as an antipattern, these methods only should be > > invoked by Airflow Worker. > > You never know what should be done before, especially if it about > > deferrable operator or sensors in reschedule mode > > > > > > > > > > On Tue, 27 Feb 2024 at 12:30, Bolke de Bruin <bdbr...@gmail.com> wrote: > > > > > Interesting! Jarek and I had a bit of a discussion on slack a > > > couple of months ago in the context of Operators Need to Die (if I > > > recall > > correctly). > > > > > > The idea behind Operators is that you have a vetted way of > > > executing a certain logic apart from the boilerplate (i.e. > > > execute, post_execute) > > that > > > makes things work as a Task. A TaskFlow task only provides the > > boilerplate. > > > So if we can separate out the business logic from the boilerplate > > > and generalize it as such so that it becomes useful in both > > > TaskFlow and classic Operators it would allow for a more flexible > > > pattern in > TaskFlow > > > and if required a more regulated / strict pattern with Operators > > > and > thus > > > being backwards compatible. > > > > > > Jarek argued for snake case calling of those library functions > > > then > like > > so > > > (paraphrasing): > > > > > > @task(task_id='some_id', provide_context=True) def > > > some_dummy_task(**context) -> int: > > > ti = context['ti'] > > > cmd2 = 'echo "pushing 2"' > > > return run_bash(cmd=cmd2) > > > > > > There is little logic in the BashOperator that would prevent us > > > from rewiring it so that it also calls out run_bash_command while > > > keeping > the > > > same signature. > > > > > > Naming could be argued upon. It could be a standard pattern like: > > > > > > BashOperator -> run_bash() or just bash() EmailOperator -> > > > run_email or email() > > > > > > I *think* I prefer a library pattern for this rather than hooks. > > > Hooks > > just > > > don't feel entirely right. They are primarily there to manage > > connections. > > > In that way I can imagine that they register themselves upon usage > > > for `on_kill` or something like that to ensure proper resource management. > > > Maybe a HookMixin that gets autowired into the library function so > > > that certain things get auto registered (Bash has the subprocess > > > thing, this > > can > > > be generalized). > > > > > > Anyways with the above described pattern I like option 1 more > > > because > it > > > distinguishes between opinionated / strict / classic and flexible. > > > > > > my 2 cents, > > > Bolke > > > > > > > > > On Tue, 27 Feb 2024 at 08:33, Tzu-ping Chung > > > <t...@astronomer.io.invalid > > > > > wrote: > > > > > > > This kind of ties back to Bolke’s Operator Must Die > > > > manifesto—you shouldn’t use the operator class here at all in > > > > this situation, but > the > > > > corresponding hook instead. > > > > > > > > Regarding preventing this anti-pattern, I can think of two ways: > > > > > > > > Add a check in BaseOperator to detect whether __init__ is called > > > > in a > > > task > > > > work context, and error out guiding users to use hooks instead. > > > > Accept this as a valid pattern, and actually push to XCom. > > > > > > > > Personally I like options 1 more. Option 2 is admittedly less > > disruptive, > > > > but I’m pretty confident people will find other ways to use > > > > operators > > > that > > > > do not work. Operators will never die, but they should fade > > > > away. < > > > > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%25 > > > > 2Fen.wikipedia.org%2Fwiki%2FOld_soldiers_never_die&data=05%7C02% > > > > 7Cdavid.blain%40infrabel.be%7Cab41cb8aafea4c6d566b08dc379a5a6e%7 > > > > Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C638446383286767473%7 > > > > CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBT > > > > iI6Ik1haWwiLCJXVCI6Mn0%3D%7C0%7C%7C%7C&sdata=t6U8E35KX6embvSHILs > > > > wwYDTOMN07o%2FiTmgGLvaVKjs%3D&reserved=0> > > > > > > > > TP > > > > > > > > > > > > > On 27 Feb 2024, at 15:09, Jarek Potiuk <ja...@potiuk.com> wrote: > > > > > > > > > > Hello here, > > > > > > > > > > I have seen recently at least a few times that our users > > > > > started to > > > use a > > > > > strange pattern > > > > > > > > > > @task(task_id='some_id', provide_context=True) def > > > > > some_dummy_task(**context): > > > > > ti = context['ti'] > > > > > cmd2 = 'echo "pushing 2"' > > > > > dbt_task = BashOperator(task_id='some_dummy_task_id', > > > > > bash_command=cmd2, > > > > > do_xcom_push=True) > > > > > dbt_task.execute(context)` > > > > > > > > > > This **works** for them but then - for example - they complain > > > > > that > > > xcom > > > > is > > > > > not pushed (well of course it's not if you use operator and > manually > > > run > > > > > execute). > > > > > > > > > > Now - while this is a bad pattern - obviously - it seems that > > > > > our > > users > > > > > **think** they can do itl. And maybe we can do something to > > > > > prevent > > > them > > > > > shooting themselves in their foot? > > > > > > > > > > I do not know that well the @task decoration magic, but maybe > > > > > we > > could > > > > > somehow detect the case where someone instantiates the > > > > > operator > (and > > > > runs > > > > > execute) inside a decorated task and give a helpful error in > > > > > this > > > case? I > > > > > am afraid people will start using it more and more and the > > > > > sooner > we > > > add > > > > > protection against it, the better chance we have to contain it. > > > > > > > > > > J. > > > > > > > > > > > > > > -- > > > > > > -- > > > Bolke de Bruin > > > bdbr...@gmail.com > > > > > >