I guess PR to propose something for that would be a good start :). I I
am fine - for now with forbidding it "as-is" and then maybe next step
once we do it is to figure out a nicer way of doing it than Hooks.
Maybe it can be done in parallel.

On Fri, Mar 1, 2024 at 9:19 PM Blain David <david.bl...@infrabel.be> wrote:
>
> It's certainly possible to check from where a python method is being called 
> using traceback.
>
> I do think prohibiting the execute method of an operator being called 
> manually would be a good idea, I've also came accross this in multiple DAG's 
> and this is ugly and looks like a hack.
> Maybe in the beginning we could just print a warning message, but once a 
> minor or major release occurs, then we could raise an exception instead.
>
> We could refactor the BaseOperatorMeta class so we check from where the 
> execute method is being called.
>
> Maybe there are other ways to achieve the same but this is one of the 
> possible solutions I came up with.
>
> Below an example of how it could be done together with a unit test to verify 
> it's behaviour:
>
> import traceback
> from abc import ABCMeta
> from typing import Any
> from unittest import TestCase
> from unittest.mock import Mock, patch, MagicMock
>
> import pendulum
> from airflow import AirflowException, DAG, settings
> from airflow.models import TaskInstance, DagRun
> from airflow.models.baseoperator import BaseOperatorMeta, BaseOperator
> from airflow.utils.context import Context
> from airflow.utils.state import DagRunState
> from assertpy import assert_that
> from mockito import mock, ANY, when, KWARGS
> from sqlalchemy.orm import Session, Query
>
>
> def executor_safeguard():
>     def decorator(func):
>         def wrapper(*args, **kwargs):
>             # TODO: here we would need some kind of switch to detect if we 
> are in testing mode or not as
>             # we want to be able to execute the execute method of the 
> operators directly from within unit tests
>             caller_frame = traceback.extract_stack()[-2]  # Get the caller 
> frame excluding the current frame
>             if caller_frame.name == "_execute_task" and "taskinstance" in 
> caller_frame.filename:
>                 return func(*args, **kwargs)
>             raise AirflowException(f"Method {func.__name__} cannot be called 
> from {caller_frame.name}")
>         return wrapper
>     return decorator
>
>
> def patched_base_operator_meta_new(cls, clsname, bases, clsdict):
>     execute_method = clsdict.get("execute")
>     if callable(execute_method) and not getattr(execute_method, 
> '__isabstractmethod__', False):
>         clsdict["execute"] = executor_safeguard()(execute_method)
>     return ABCMeta.__new__(cls, clsname, bases, clsdict)
>
>
> # for demo purposes, I'm patching the __new__ magic method of BaseOperatorMeta
> BaseOperatorMeta.__new__ = patched_base_operator_meta_new
>
>
> class HelloWorldOperator(BaseOperator):
>     called = False
>
>     def execute(self, context: Context) -> Any:
>         HelloWorldOperator.called = True
>         return f"Hello {self.owner}!"
>
>
> class IterableSession(Session):
>     def __next__(self):
>         pass
>
>
> class ExecutorSafeguardTestCase(TestCase):
>
>     def test_executor_safeguard_when_unauthorized(self):
>         with self.assertRaises(AirflowException):
>             dag = DAG(dag_id="hello_world")
>             context = mock(spec=Context)
>
>             HelloWorldOperator(task_id="task_id", 
> dag=dag).execute(context=context)
>
>     @patch("sqlalchemy.orm.Session.__init__")
>     def test_executor_safeguard_when_authorized(self, mock_session: 
> MagicMock):
>         query = mock(spec=Query)
>         when(query).filter_by(**KWARGS).thenReturn(query)
>         when(query).filter(ANY).thenReturn(query)
>         when(query).scalar().thenAnswer(lambda: "dag_run_id")
>         when(query).delete()
>         session = mock(spec=IterableSession)
>         when(session).query(ANY).thenReturn(query)
>         when(session).scalar(ANY)
>         when(session).__iter__().thenAnswer(lambda: iter({}))
>         when(session).commit()
>         when(session).close()
>         when(session).execute(ANY)
>         when(session).add(ANY)
>         when(session).flush()
>         when(settings).Session().thenReturn(session)
>
>         mock_session.return_value = session
>
>         dag = DAG(dag_id="hello_world")
>         TaskInstance.get_task_instance = Mock(return_value=None)
>         when(TaskInstance).get_task_instance(
>             dag_id="hello_world",
>             task_id="hello_operator",
>             run_id="run_id",
>             map_index=-1,
>             select_columns=True,
>             lock_for_update=False,
>             session=session,
>         ).thenReturn(None)
>
>         operator = HelloWorldOperator(task_id="hello_operator", dag=dag)
>
>         assert_that(operator.called).is_false()
>
>         task_instance = TaskInstance(task=operator, run_id="run_id")
>         task_instance.task_id = "hello_operator"
>         task_instance.dag_id = "hello_world"
>         task_instance.dag_run = DagRun(run_id="run_id", dag_id="hello_world", 
> execution_date=pendulum.now(), state=DagRunState.RUNNING)
>         task_instance._run_raw_task(test_mode=True, session=session)
>
>         assert_that(operator.called).is_true()
>
> Maybe start a pull request for this one?  What do you guys think?
>
> Kind regards,
> David
>
> -----Original Message-----
> From: Andrey Anshin <andrey.ans...@taragol.is>
> Sent: Tuesday, 27 February 2024 14:36
> To: dev@airflow.apache.org
> Subject: Re: Bad mixing of decorated and classic operators (users shooting 
> themselves in their foot)
>
> [You don't often get email from andrey.ans...@taragol.is. Learn why this is 
> important at https://aka.ms/LearnAboutSenderIdentification ]
>
> EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze niet 
> vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel, stuur 
> deze e-mail als bijlage naar ab...@infrabel.be<mailto:ab...@infrabel.be>.
>
> I can't see which problem is solved by allowing running one operator inside 
> another.
>
> *Propagate templated fields?*
> In most cases it could be changed for a particular task or entire operator by 
> monkey patching, which in this case more safe rather than run operator in 
> operator. Or even available out of the box
>
> *Do an expensive operation for preparing parameters to the classical
> operators?*
> Well, just calculate it into the separate task flow operator and propagate 
> output into the classical operator, most of them do not expect to have a huge 
> input and types are primitives.
>
> I'm not sure that is possible to solve all things in top of the 
> Task/Operators, e.g.:
> - Run it into the @task.docker, @task.kubernetes, @task.external, 
> @task.virtualenv
> - Run deferrable operators
> - Run rescheduling sensors
>
> Technically we could just run it into the separate executor inside of the 
> worker, I guess by the same way it worked in the past when for K8S and Celery 
> executors was an option Run Task.
>
> Another popular case it is run operator inside of callbacks, which are pretty 
> popular for the slack because there is some guidance from the Airflow 1.10.x 
> exists into the Internet even if we have Hooks and Notifiers for that, some 
> links from the first page of the Google on "airflow slack notifications" 
> request:
>
> *Bad Examples:*
> -
> https://towardsdatascience.com/automated-alerts-for-airflow-with-slack-5c6ec766a823
> -
> https://www.reply.com/data-reply/en/content/integrating-slack-alerts-in-airflow
> -
> https://awstip.com/integrating-slack-with-airflow-step-by-step-guide-to-setup-alerts-1dc71d5e65ef
> - https://naiveskill.com/airflow-slack-alert/
> - https://gist.github.com/kzzzr/a2a4152f6a7c03cd984e797c08ac702f
> -
> https://docs.astronomer.io/learn/error-notifications-in-airflow#legacy-slack-notifications-pre-26
>
> *Good Examples:*
> -
> https://www.restack.io/docs/airflow-knowledge-apache-providers-slack-webhook-http-pypi-operator
> -
> https://docs.astronomer.io/learn/error-notifications-in-airflow?tab=taskflow#example-pre-built-notifier-slack
>
> I do not know how to force users not to use this approach but I guess it is 
> better to avoid "If you can't win it, lead it".
>
>
> On Tue, 27 Feb 2024 at 13:48, Jarek Potiuk <ja...@potiuk.com> wrote:
>
> > Yeah. I kinda like (and see it emerging from the discussion) that we
> > can (which I love) have cake and eat it too :). Means - I think we can
> > have both 1) and 2) ...
> >
> > 1) We should raise an error if someone uses AnOperator in task context
> > (the way TP described it would work nicely) - making calling the
> > `execute` pattern directly wrong
> > 2) MAYBE we can figure out a way to actually allow the users to use
> > the logic that Bolke described in a nice, and "supported" way. I would
> > actually love it, if we find an easy way to make the 3500+ operators
> > we have - immediately available to our taskflow users.
> >
> > I don't particularly like the idea of having a run_bash, run_xxxx etc.
> > for all the 3500+ operators we have.
> >
> > But how about just:
> >
> > result = run_operator(BashOperator(.,...))
> >
> > If we pair it with the error from point 1) (and tell the user "Hey, if
> > you want to do that you need to do it this way: run_operator(...") and
> > implement appropriate pre/post processing in run_operator logic, I see
> > no reason why it would not work..
> > Yes - it is slightly ugly, but only slightly - and it's entirely NOT
> > magical and explicit. You would not even have to run `execute()` directly.
> > Heck we could likely implement the run_operator() in the way to nicely
> > work for sensors and deferrable operators and properly implement
> > handling of all the cases I **think**. And properly design the rules
> > on what happens with xcom and task ids (because the question is
> > whether such xcom should belong to the parent task id or maybe we
> > should have it for "internal" task_id etc. etc.
> >
> > I think that would be super-powerful, as TP wrote - operator would
> > mostly fade away - but not disappear entirely
> >
> > J.
> >
> >
> >
> > On Tue, Feb 27, 2024 at 10:06 AM Andrey Anshin
> > <andrey.ans...@taragol.is>
> > wrote:
> >
> > > I think manually using *execute*, *poke*, *choose_branch* and etc.
> > > should be considered as an antipattern, these methods only should be
> > > invoked by Airflow Worker.
> > > You never know what should be done before, especially if it about
> > > deferrable operator or sensors in reschedule mode
> > >
> > >
> > >
> > >
> > > On Tue, 27 Feb 2024 at 12:30, Bolke de Bruin <bdbr...@gmail.com> wrote:
> > >
> > > > Interesting! Jarek and I had a bit of a discussion on slack a
> > > > couple of months ago in the context of Operators Need to Die (if I
> > > > recall
> > > correctly).
> > > >
> > > > The idea behind Operators is that you have a vetted way of
> > > > executing a certain logic apart from the boilerplate (i.e.
> > > > execute, post_execute)
> > > that
> > > > makes things work as a Task. A TaskFlow task only provides the
> > > boilerplate.
> > > > So if we can separate out the business logic from the boilerplate
> > > > and generalize it as such so that it becomes useful in both
> > > > TaskFlow and classic Operators it would allow for a more flexible
> > > > pattern in
> > TaskFlow
> > > > and if required a more regulated / strict pattern with Operators
> > > > and
> > thus
> > > > being backwards compatible.
> > > >
> > > > Jarek argued for snake case calling of those library functions
> > > > then
> > like
> > > so
> > > > (paraphrasing):
> > > >
> > > > @task(task_id='some_id', provide_context=True) def
> > > > some_dummy_task(**context) -> int:
> > > >     ti = context['ti']
> > > >    cmd2 = 'echo "pushing 2"'
> > > >    return run_bash(cmd=cmd2)
> > > >
> > > > There is little logic in the BashOperator that would prevent us
> > > > from rewiring it so that it also calls out run_bash_command while
> > > > keeping
> > the
> > > > same signature.
> > > >
> > > > Naming could be argued upon. It could be a standard pattern like:
> > > >
> > > > BashOperator -> run_bash() or just bash() EmailOperator ->
> > > > run_email or email()
> > > >
> > > > I *think* I prefer a library pattern for this rather than hooks.
> > > > Hooks
> > > just
> > > > don't feel entirely right. They are primarily there to manage
> > > connections.
> > > > In that way I can imagine that they register themselves upon usage
> > > > for `on_kill` or something like that to ensure proper resource 
> > > > management.
> > > > Maybe a HookMixin that gets autowired into the library function so
> > > > that certain things get auto registered (Bash has the subprocess
> > > > thing, this
> > > can
> > > > be generalized).
> > > >
> > > > Anyways with the above described pattern I like option 1 more
> > > > because
> > it
> > > > distinguishes between opinionated / strict / classic and flexible.
> > > >
> > > > my 2 cents,
> > > > Bolke
> > > >
> > > >
> > > > On Tue, 27 Feb 2024 at 08:33, Tzu-ping Chung
> > > > <t...@astronomer.io.invalid
> > >
> > > > wrote:
> > > >
> > > > > This kind of ties back to Bolke’s Operator Must Die
> > > > > manifesto—you shouldn’t use the operator class here at all in
> > > > > this situation, but
> > the
> > > > > corresponding hook instead.
> > > > >
> > > > > Regarding preventing this anti-pattern, I can think of two ways:
> > > > >
> > > > > Add a check in BaseOperator to detect whether __init__ is called
> > > > > in a
> > > > task
> > > > > work context, and error out guiding users to use hooks instead.
> > > > > Accept this as a valid pattern, and actually push to XCom.
> > > > >
> > > > > Personally I like options 1 more. Option 2 is admittedly less
> > > disruptive,
> > > > > but I’m pretty confident people will find other ways to use
> > > > > operators
> > > > that
> > > > > do not work. Operators will never die, but they should fade
> > > > > away. <
> > > > > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%25
> > > > > 2Fen.wikipedia.org%2Fwiki%2FOld_soldiers_never_die&data=05%7C02%
> > > > > 7Cdavid.blain%40infrabel.be%7Cab41cb8aafea4c6d566b08dc379a5a6e%7
> > > > > Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C638446383286767473%7
> > > > > CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBT
> > > > > iI6Ik1haWwiLCJXVCI6Mn0%3D%7C0%7C%7C%7C&sdata=t6U8E35KX6embvSHILs
> > > > > wwYDTOMN07o%2FiTmgGLvaVKjs%3D&reserved=0>
> > > > >
> > > > > TP
> > > > >
> > > > >
> > > > > > On 27 Feb 2024, at 15:09, Jarek Potiuk <ja...@potiuk.com> wrote:
> > > > > >
> > > > > > Hello here,
> > > > > >
> > > > > > I have seen recently at least a few times that our users
> > > > > > started to
> > > > use a
> > > > > > strange pattern
> > > > > >
> > > > > > @task(task_id='some_id', provide_context=True) def
> > > > > > some_dummy_task(**context):
> > > > > >    ti = context['ti']
> > > > > >   cmd2 = 'echo "pushing 2"'
> > > > > >   dbt_task = BashOperator(task_id='some_dummy_task_id',
> > > > > >                        bash_command=cmd2,
> > > > > >                        do_xcom_push=True)
> > > > > >   dbt_task.execute(context)`
> > > > > >
> > > > > > This **works** for them but then - for example - they complain
> > > > > > that
> > > > xcom
> > > > > is
> > > > > > not pushed (well of course it's not if you use operator and
> > manually
> > > > run
> > > > > > execute).
> > > > > >
> > > > > > Now - while this is a bad pattern - obviously - it seems that
> > > > > > our
> > > users
> > > > > > **think** they can do itl. And maybe we can do something to
> > > > > > prevent
> > > > them
> > > > > > shooting themselves in their foot?
> > > > > >
> > > > > > I do not know that well the @task decoration magic, but maybe
> > > > > > we
> > > could
> > > > > > somehow detect the case where someone instantiates the
> > > > > > operator
> > (and
> > > > > runs
> > > > > > execute) inside a decorated task and give a helpful error in
> > > > > > this
> > > > case? I
> > > > > > am afraid people will start using it more and more and the
> > > > > > sooner
> > we
> > > > add
> > > > > > protection against it, the better chance we have to contain it.
> > > > > >
> > > > > > J.
> > > > >
> > > > >
> > > >
> > > > --
> > > >
> > > > --
> > > > Bolke de Bruin
> > > > bdbr...@gmail.com
> > > >
> > >
> >

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
For additional commands, e-mail: dev-h...@airflow.apache.org

Reply via email to