GitHub user bobojobo created a discussion: Serialization of notifier inputs
Hi,
I am trying use a notifier but I am running in some issues. Here is a boiled
down version of my code:
```from __future__ import annotations
from typing import Protocol, runtime_checkable
import logging
from airflow.sdk import BaseNotifier, Context, DAG, task
from airflow.providers.amazon.aws.operators.eks import EksPodOperator
@runtime_checkable
class OutputProtocol(Protocol):
def __call__(self, context: Context) -> str | None: ...
class UpdateStatusNotifier(BaseNotifier):
template_fields = ("output",)
def __init__(self, output: str | OutputProtocol | None) -> None:
self.output = output
def notify(self, context: Context) -> None:
params = context["params"]
some_parameter = params["someParameter"]
if isinstance(self.output, (str, type(None))):
output = self.output
elif isinstance(self.output, OutputProtocol):
output = self.output(context=context)
else:
raise ValueError(f"Output must be a string, None, or
OutputProtocol, not {type(self.output)}")
logging.info(f"Notifying {output=}, {some_parameter=}")
def success_output_callable(context: Context) -> str:
return f"{context.params.someparam}/something.something"
with DAG(
...
) as dag:
@task.external_python(
task_id="pre",
python="some/external/python/env",
serializer="cloudpickle",
env_vars={"PYTHONPATH": "PYTHONPATH/TO/DAGS"},
)
def pre(**kwargs):
print("preparing...")
prepare_task = pre()
exec_k8s_eks = EksPodOperator(
...
on_success_callback=[UpdateStatusNotifier(output=success_output)],
)
pre >> exec_k8s_eks
```
When I run this the external_python task `pre` fails with this error:
```
arg_dict = cloudpickle.load(file): source="airflow.utils.process_utils"
^^^^^^^^^^^^^^^^^^^^^^: source="airflow.utils.process_utils"
AttributeError: Can't get attribute 'success_output_callable' on <module
'unusual_prefix_d1679e9cfe1bebd001c0ce44b2e917a565e777c8_dag_dagname'
```
Using pickle also gives an error:
```
PicklingError: Can't pickle <function pre at 0x7f39ec105300>: it's not the same
object as
unusual_prefix_962f620e579056dd11a9021e4d340e7d5a212cc7_dag_dagname.pre
```
When I pass a string like
`on_success_callback=[UpdateStatusNotifier(output="some string")]` everything
runs fine.
When I remove the external_python task `pre` all together but keep passing the
function to my `UpdateStatusNotifier`, it also runs fine and does the
notification.
So I am wondering why airflow wants to pickle my `UpdateStatusNotifier` inputs
when I pass a function and have an external_python task in my DAG. Especially,
because that task doesn't even have a notifier configured.
GitHub link: https://github.com/apache/airflow/discussions/56475
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]