It's certainly possible to check from where a python method is being called 
using traceback.

I do think prohibiting the execute method of an operator being called manually 
would be a good idea, I've also came accross this in multiple DAG's and this is 
ugly and looks like a hack.
Maybe in the beginning we could just print a warning message, but once a minor 
or major release occurs, then we could raise an exception instead.

We could refactor the BaseOperatorMeta class so we check from where the execute 
method is being called.

Maybe there are other ways to achieve the same but this is one of the possible 
solutions I came up with.

Below an example of how it could be done together with a unit test to verify 
it's behaviour:

import traceback
from abc import ABCMeta
from typing import Any
from unittest import TestCase
from unittest.mock import Mock, patch, MagicMock

import pendulum
from airflow import AirflowException, DAG, settings
from airflow.models import TaskInstance, DagRun
from airflow.models.baseoperator import BaseOperatorMeta, BaseOperator
from airflow.utils.context import Context
from airflow.utils.state import DagRunState
from assertpy import assert_that
from mockito import mock, ANY, when, KWARGS
from sqlalchemy.orm import Session, Query


def executor_safeguard():
    def decorator(func):
        def wrapper(*args, **kwargs):
            # TODO: here we would need some kind of switch to detect if we are 
in testing mode or not as
            # we want to be able to execute the execute method of the operators 
directly from within unit tests
            caller_frame = traceback.extract_stack()[-2]  # Get the caller 
frame excluding the current frame
            if caller_frame.name == "_execute_task" and "taskinstance" in 
caller_frame.filename:
                return func(*args, **kwargs)
            raise AirflowException(f"Method {func.__name__} cannot be called 
from {caller_frame.name}")
        return wrapper
    return decorator


def patched_base_operator_meta_new(cls, clsname, bases, clsdict):
    execute_method = clsdict.get("execute")
    if callable(execute_method) and not getattr(execute_method, 
'__isabstractmethod__', False):
        clsdict["execute"] = executor_safeguard()(execute_method)
    return ABCMeta.__new__(cls, clsname, bases, clsdict)


# for demo purposes, I'm patching the __new__ magic method of BaseOperatorMeta
BaseOperatorMeta.__new__ = patched_base_operator_meta_new


class HelloWorldOperator(BaseOperator):
    called = False

    def execute(self, context: Context) -> Any:
        HelloWorldOperator.called = True
        return f"Hello {self.owner}!"


class IterableSession(Session):
    def __next__(self):
        pass


class ExecutorSafeguardTestCase(TestCase):

    def test_executor_safeguard_when_unauthorized(self):
        with self.assertRaises(AirflowException):
            dag = DAG(dag_id="hello_world")
            context = mock(spec=Context)

            HelloWorldOperator(task_id="task_id", 
dag=dag).execute(context=context)

    @patch("sqlalchemy.orm.Session.__init__")
    def test_executor_safeguard_when_authorized(self, mock_session: MagicMock):
        query = mock(spec=Query)
        when(query).filter_by(**KWARGS).thenReturn(query)
        when(query).filter(ANY).thenReturn(query)
        when(query).scalar().thenAnswer(lambda: "dag_run_id")
        when(query).delete()
        session = mock(spec=IterableSession)
        when(session).query(ANY).thenReturn(query)
        when(session).scalar(ANY)
        when(session).__iter__().thenAnswer(lambda: iter({}))
        when(session).commit()
        when(session).close()
        when(session).execute(ANY)
        when(session).add(ANY)
        when(session).flush()
        when(settings).Session().thenReturn(session)

        mock_session.return_value = session

        dag = DAG(dag_id="hello_world")
        TaskInstance.get_task_instance = Mock(return_value=None)
        when(TaskInstance).get_task_instance(
            dag_id="hello_world",
            task_id="hello_operator",
            run_id="run_id",
            map_index=-1,
            select_columns=True,
            lock_for_update=False,
            session=session,
        ).thenReturn(None)

        operator = HelloWorldOperator(task_id="hello_operator", dag=dag)

        assert_that(operator.called).is_false()

        task_instance = TaskInstance(task=operator, run_id="run_id")
        task_instance.task_id = "hello_operator"
        task_instance.dag_id = "hello_world"
        task_instance.dag_run = DagRun(run_id="run_id", dag_id="hello_world", 
execution_date=pendulum.now(), state=DagRunState.RUNNING)
        task_instance._run_raw_task(test_mode=True, session=session)

        assert_that(operator.called).is_true()

Maybe start a pull request for this one?  What do you guys think?

Kind regards,
David

-----Original Message-----
From: Andrey Anshin <andrey.ans...@taragol.is>
Sent: Tuesday, 27 February 2024 14:36
To: dev@airflow.apache.org
Subject: Re: Bad mixing of decorated and classic operators (users shooting 
themselves in their foot)

[You don't often get email from andrey.ans...@taragol.is. Learn why this is 
important at https://aka.ms/LearnAboutSenderIdentification ]

EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze niet 
vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel, stuur deze 
e-mail als bijlage naar ab...@infrabel.be<mailto:ab...@infrabel.be>.

I can't see which problem is solved by allowing running one operator inside 
another.

*Propagate templated fields?*
In most cases it could be changed for a particular task or entire operator by 
monkey patching, which in this case more safe rather than run operator in 
operator. Or even available out of the box

*Do an expensive operation for preparing parameters to the classical
operators?*
Well, just calculate it into the separate task flow operator and propagate 
output into the classical operator, most of them do not expect to have a huge 
input and types are primitives.

I'm not sure that is possible to solve all things in top of the Task/Operators, 
e.g.:
- Run it into the @task.docker, @task.kubernetes, @task.external, 
@task.virtualenv
- Run deferrable operators
- Run rescheduling sensors

Technically we could just run it into the separate executor inside of the 
worker, I guess by the same way it worked in the past when for K8S and Celery 
executors was an option Run Task.

Another popular case it is run operator inside of callbacks, which are pretty 
popular for the slack because there is some guidance from the Airflow 1.10.x 
exists into the Internet even if we have Hooks and Notifiers for that, some 
links from the first page of the Google on "airflow slack notifications" 
request:

*Bad Examples:*
-
https://towardsdatascience.com/automated-alerts-for-airflow-with-slack-5c6ec766a823
-
https://www.reply.com/data-reply/en/content/integrating-slack-alerts-in-airflow
-
https://awstip.com/integrating-slack-with-airflow-step-by-step-guide-to-setup-alerts-1dc71d5e65ef
- https://naiveskill.com/airflow-slack-alert/
- https://gist.github.com/kzzzr/a2a4152f6a7c03cd984e797c08ac702f
-
https://docs.astronomer.io/learn/error-notifications-in-airflow#legacy-slack-notifications-pre-26

*Good Examples:*
-
https://www.restack.io/docs/airflow-knowledge-apache-providers-slack-webhook-http-pypi-operator
-
https://docs.astronomer.io/learn/error-notifications-in-airflow?tab=taskflow#example-pre-built-notifier-slack

I do not know how to force users not to use this approach but I guess it is 
better to avoid "If you can't win it, lead it".


On Tue, 27 Feb 2024 at 13:48, Jarek Potiuk <ja...@potiuk.com> wrote:

> Yeah. I kinda like (and see it emerging from the discussion) that we
> can (which I love) have cake and eat it too :). Means - I think we can
> have both 1) and 2) ...
>
> 1) We should raise an error if someone uses AnOperator in task context
> (the way TP described it would work nicely) - making calling the
> `execute` pattern directly wrong
> 2) MAYBE we can figure out a way to actually allow the users to use
> the logic that Bolke described in a nice, and "supported" way. I would
> actually love it, if we find an easy way to make the 3500+ operators
> we have - immediately available to our taskflow users.
>
> I don't particularly like the idea of having a run_bash, run_xxxx etc.
> for all the 3500+ operators we have.
>
> But how about just:
>
> result = run_operator(BashOperator(.,...))
>
> If we pair it with the error from point 1) (and tell the user "Hey, if
> you want to do that you need to do it this way: run_operator(...") and
> implement appropriate pre/post processing in run_operator logic, I see
> no reason why it would not work..
> Yes - it is slightly ugly, but only slightly - and it's entirely NOT
> magical and explicit. You would not even have to run `execute()` directly.
> Heck we could likely implement the run_operator() in the way to nicely
> work for sensors and deferrable operators and properly implement
> handling of all the cases I **think**. And properly design the rules
> on what happens with xcom and task ids (because the question is
> whether such xcom should belong to the parent task id or maybe we
> should have it for "internal" task_id etc. etc.
>
> I think that would be super-powerful, as TP wrote - operator would
> mostly fade away - but not disappear entirely
>
> J.
>
>
>
> On Tue, Feb 27, 2024 at 10:06 AM Andrey Anshin
> <andrey.ans...@taragol.is>
> wrote:
>
> > I think manually using *execute*, *poke*, *choose_branch* and etc.
> > should be considered as an antipattern, these methods only should be
> > invoked by Airflow Worker.
> > You never know what should be done before, especially if it about
> > deferrable operator or sensors in reschedule mode
> >
> >
> >
> >
> > On Tue, 27 Feb 2024 at 12:30, Bolke de Bruin <bdbr...@gmail.com> wrote:
> >
> > > Interesting! Jarek and I had a bit of a discussion on slack a
> > > couple of months ago in the context of Operators Need to Die (if I
> > > recall
> > correctly).
> > >
> > > The idea behind Operators is that you have a vetted way of
> > > executing a certain logic apart from the boilerplate (i.e.
> > > execute, post_execute)
> > that
> > > makes things work as a Task. A TaskFlow task only provides the
> > boilerplate.
> > > So if we can separate out the business logic from the boilerplate
> > > and generalize it as such so that it becomes useful in both
> > > TaskFlow and classic Operators it would allow for a more flexible
> > > pattern in
> TaskFlow
> > > and if required a more regulated / strict pattern with Operators
> > > and
> thus
> > > being backwards compatible.
> > >
> > > Jarek argued for snake case calling of those library functions
> > > then
> like
> > so
> > > (paraphrasing):
> > >
> > > @task(task_id='some_id', provide_context=True) def
> > > some_dummy_task(**context) -> int:
> > >     ti = context['ti']
> > >    cmd2 = 'echo "pushing 2"'
> > >    return run_bash(cmd=cmd2)
> > >
> > > There is little logic in the BashOperator that would prevent us
> > > from rewiring it so that it also calls out run_bash_command while
> > > keeping
> the
> > > same signature.
> > >
> > > Naming could be argued upon. It could be a standard pattern like:
> > >
> > > BashOperator -> run_bash() or just bash() EmailOperator ->
> > > run_email or email()
> > >
> > > I *think* I prefer a library pattern for this rather than hooks.
> > > Hooks
> > just
> > > don't feel entirely right. They are primarily there to manage
> > connections.
> > > In that way I can imagine that they register themselves upon usage
> > > for `on_kill` or something like that to ensure proper resource management.
> > > Maybe a HookMixin that gets autowired into the library function so
> > > that certain things get auto registered (Bash has the subprocess
> > > thing, this
> > can
> > > be generalized).
> > >
> > > Anyways with the above described pattern I like option 1 more
> > > because
> it
> > > distinguishes between opinionated / strict / classic and flexible.
> > >
> > > my 2 cents,
> > > Bolke
> > >
> > >
> > > On Tue, 27 Feb 2024 at 08:33, Tzu-ping Chung
> > > <t...@astronomer.io.invalid
> >
> > > wrote:
> > >
> > > > This kind of ties back to Bolke’s Operator Must Die
> > > > manifesto—you shouldn’t use the operator class here at all in
> > > > this situation, but
> the
> > > > corresponding hook instead.
> > > >
> > > > Regarding preventing this anti-pattern, I can think of two ways:
> > > >
> > > > Add a check in BaseOperator to detect whether __init__ is called
> > > > in a
> > > task
> > > > work context, and error out guiding users to use hooks instead.
> > > > Accept this as a valid pattern, and actually push to XCom.
> > > >
> > > > Personally I like options 1 more. Option 2 is admittedly less
> > disruptive,
> > > > but I’m pretty confident people will find other ways to use
> > > > operators
> > > that
> > > > do not work. Operators will never die, but they should fade
> > > > away. <
> > > > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%25
> > > > 2Fen.wikipedia.org%2Fwiki%2FOld_soldiers_never_die&data=05%7C02%
> > > > 7Cdavid.blain%40infrabel.be%7Cab41cb8aafea4c6d566b08dc379a5a6e%7
> > > > Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C638446383286767473%7
> > > > CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBT
> > > > iI6Ik1haWwiLCJXVCI6Mn0%3D%7C0%7C%7C%7C&sdata=t6U8E35KX6embvSHILs
> > > > wwYDTOMN07o%2FiTmgGLvaVKjs%3D&reserved=0>
> > > >
> > > > TP
> > > >
> > > >
> > > > > On 27 Feb 2024, at 15:09, Jarek Potiuk <ja...@potiuk.com> wrote:
> > > > >
> > > > > Hello here,
> > > > >
> > > > > I have seen recently at least a few times that our users
> > > > > started to
> > > use a
> > > > > strange pattern
> > > > >
> > > > > @task(task_id='some_id', provide_context=True) def
> > > > > some_dummy_task(**context):
> > > > >    ti = context['ti']
> > > > >   cmd2 = 'echo "pushing 2"'
> > > > >   dbt_task = BashOperator(task_id='some_dummy_task_id',
> > > > >                        bash_command=cmd2,
> > > > >                        do_xcom_push=True)
> > > > >   dbt_task.execute(context)`
> > > > >
> > > > > This **works** for them but then - for example - they complain
> > > > > that
> > > xcom
> > > > is
> > > > > not pushed (well of course it's not if you use operator and
> manually
> > > run
> > > > > execute).
> > > > >
> > > > > Now - while this is a bad pattern - obviously - it seems that
> > > > > our
> > users
> > > > > **think** they can do itl. And maybe we can do something to
> > > > > prevent
> > > them
> > > > > shooting themselves in their foot?
> > > > >
> > > > > I do not know that well the @task decoration magic, but maybe
> > > > > we
> > could
> > > > > somehow detect the case where someone instantiates the
> > > > > operator
> (and
> > > > runs
> > > > > execute) inside a decorated task and give a helpful error in
> > > > > this
> > > case? I
> > > > > am afraid people will start using it more and more and the
> > > > > sooner
> we
> > > add
> > > > > protection against it, the better chance we have to contain it.
> > > > >
> > > > > J.
> > > >
> > > >
> > >
> > > --
> > >
> > > --
> > > Bolke de Bruin
> > > bdbr...@gmail.com
> > >
> >
>

Reply via email to