GitHub user bobojobo edited a discussion: Serialization of notifier inputs

[airflow 3.0.3]

Hi, 
I am trying use a notifier but I am running in some issues. Here is a boiled 
down version of my code:

```from __future__ import annotations

from typing import Protocol, runtime_checkable
import logging

from airflow.sdk import BaseNotifier, Context, DAG, task
from airflow.providers.amazon.aws.operators.eks import EksPodOperator


@runtime_checkable
class OutputProtocol(Protocol):
    def __call__(self, context: Context) -> str | None: ...


class UpdateStatusNotifier(BaseNotifier):
    template_fields = ("output",)

    def __init__(self, output: str | OutputProtocol | None) -> None:
        self.output = output

    def notify(self, context: Context) -> None:

        params = context["params"]
        some_parameter = params["someParameter"]

        if isinstance(self.output, (str, type(None))):
            output = self.output
        elif isinstance(self.output, OutputProtocol):
            output = self.output(context=context)
        else:
            raise ValueError(f"Output must be a string, None, or 
OutputProtocol, not {type(self.output)}")
        
        logging.info(f"Notifying {output=}, {some_parameter=}")


def success_output_callable(context: Context) -> str:
    return f"{context.params.someparam}/something.something"
    

with DAG(
    ...
) as dag:

    @task.external_python(
        task_id="pre",
        python="some/external/python/env",
        serializer="cloudpickle",
        env_vars={"PYTHONPATH": "PYTHONPATH/TO/DAGS"},
    )
    def pre(**kwargs):
        print("preparing...")

    prepare_task = pre()

    exec_k8s_eks = EksPodOperator(
        ...
        
on_success_callback=[UpdateStatusNotifier(output=success_output_callable)],
    )

    pre >> exec_k8s_eks
```

When I run this the external_python task `pre` fails with this error: 

```
     arg_dict = cloudpickle.load(file): source="airflow.utils.process_utils"
                ^^^^^^^^^^^^^^^^^^^^^^: source="airflow.utils.process_utils"
 AttributeError: Can't get attribute 'success_output_callable' on <module 
'unusual_prefix_d1679e9cfe1bebd001c0ce44b2e917a565e777c8_dag_dagname'
```

Using pickle also gives an error:
```
PicklingError: Can't pickle <function pre at 0x7f39ec105300>: it's not the same 
object as 
unusual_prefix_962f620e579056dd11a9021e4d340e7d5a212cc7_dag_dagname.pre
```

When I pass a string like 
`on_success_callback=[UpdateStatusNotifier(output="some string")]` everything 
runs fine. 

When I remove the external_python task `pre` all together but keep passing the 
function to my `UpdateStatusNotifier`, it also runs fine and does the 
notification.

So I am wondering why airflow wants to pickle my `UpdateStatusNotifier` inputs 
when I pass a function and have an external_python task in my DAG. Especially, 
because that task doesn't even have a notifier configured.

GitHub link: https://github.com/apache/airflow/discussions/56475

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to