phi-friday opened a new issue, #35263: URL: https://github.com/apache/airflow/issues/35263
### Apache Airflow version 2.7.2 ### What happened When using the docker operator and python operator together, the celery executor throws a TypeError. ```log [2023-10-30 16:22:36,427: INFO/ForkPoolWorker-7] Running <TaskInstance: test_docker_task_error.no_error manual__2023-10-30T07:22:35.534710+00:00 [queued]> on host 97bf2fd65629 [2023-10-30 16:22:36,865: ERROR/ForkPoolWorker-7] [081e6272-34c9-4f7f-8d5f-9dd490a1d663] Failed to execute task cannot pickle 'module' object. Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 155, in _execute_in_fork args.func(args) File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 113, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 431, in task_run task_return_code = _run_task_by_selected_method(args, _dag, ti) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 209, in _run_task_by_selected_method return _run_task_by_local_task_job(args, ti) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 271, in _run_task_by_local_task_job ret = run_job(job=job_runner.job, execute_callable=job_runner._execute) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 77, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 289, in run_job return execute_job(job, execute_callable=execute_callable) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 318, in execute_job ret = execute_callable() ^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/local_task_job_runner.py", line 192, in _execute self.handle_task_exit(return_code) File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/local_task_job_runner.py", line 232, in handle_task_exit self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query) File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 77, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2748, in schedule_downstream_tasks partial_dag = task.dag.partial_subset( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 2417, in partial_subset dag.task_dict = { ^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 2418, in <dictcomp> t.task_id: _deepcopy_task(t) ^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 2415, in _deepcopy_task return copy.deepcopy(t, memo) ^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/copy.py", line 153, in deepcopy y = copier(memo) ^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1213, in __deepcopy__ setattr(result, k, copy.deepcopy(v, memo)) ^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/copy.py", line 161, in deepcopy rv = reductor(4) ^^^^^^^^^^^ TypeError: cannot pickle 'module' object [2023-10-30 16:22:36,890: ERROR/ForkPoolWorker-7] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[081e6272-34c9-4f7f-8d5f-9dd490a1d663] raised unexpected: AirflowException('Celery command failed on host: 97bf2fd65629 with celery_task_id 081e6272-34c9-4f7f-8d5f-9dd490a1d663') Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/trace.py", line 477, in trace_task R = retval = fun(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/trace.py", line 760, in __protected_call__ return self.run(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 121, in execute_command _execute_in_fork(command_to_exec, celery_task_id) File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 136, in _execute_in_fork raise AirflowException(msg) airflow.exceptions.AirflowException: Celery command failed on host: 97bf2fd65629 with celery_task_id 081e6272-34c9-4f7f-8d5f-9dd490a1d663 [2023-10-30 16:22:37,265: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[4fb8737b-2a3d-493b-ade6-7edfd6042df3] received [2023-10-30 16:22:37,273: INFO/ForkPoolWorker-7] [4fb8737b-2a3d-493b-ade6-7edfd6042df3] Executing command in Celery: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 'manual__2023-10-30T07:22:35.534710+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_error.py'] [2023-10-30 16:22:37,355: INFO/ForkPoolWorker-7] Filling up the DagBag from /opt/airflow/dags/test_error.py [2023-10-30 16:22:37,599: INFO/ForkPoolWorker-7] Running <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-30T07:22:35.534710+00:00 [queued]> on host 97bf2fd65629 [2023-10-30 16:22:38,885: INFO/ForkPoolWorker-7] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[4fb8737b-2a3d-493b-ade6-7edfd6042df3] succeeded in 1.618334080092609s: None ``` An error occurs when two operators are linked by a dependency. ### What you think should happen instead _No response_ ### How to reproduce in apache/airflow:slim-2.7.2-3.11 ```python # test_dag.py from __future__ import annotations from os import environ from airflow.decorators import dag, task from pendulum.datetime import DateTime from pendulum.tz import local_timezone DEFAULT_ARGS = { "image": "python:3.11-slim-bullseye", "api_version": "auto", "network_mode": "container:airflow-worker", "docker_url": "TCP://docker-socket-proxy:2375", "auto_remove": "force", "mount_tmp_dir": False, "container_name": "pickle_error_test", "user": environ["AIRFLOW_UID"], } @task.python() def no_error() -> None: import logging logger = logging.getLogger("airflow.task") logger.info("in celery") @task.docker() def pickle_error() -> None: import logging logger = logging.getLogger("airflow.task") logger.info("in docker") @dag( start_date=DateTime.now(local_timezone()).replace( hour=0, minute=0, second=0, microsecond=0 ), schedule=None, default_args=DEFAULT_ARGS | {"do_xcom_push": False}, catchup=False, ) def test_docker_task_error() -> None: in_celery = no_error() in_docker = pickle_error() # Removing the underscore will not cause an error. _ = in_celery >> in_docker test_docker_task_error() ``` ### Operating System Ubuntu 22.04.1 LTS ### Versions of Apache Airflow Providers apache-airflow-providers-celery==3.3.4 apache-airflow-providers-common-sql==1.7.2 apache-airflow-providers-docker==3.7.5 apache-airflow-providers-ftp==3.5.2 apache-airflow-providers-http==4.5.2 apache-airflow-providers-imap==3.3.2 apache-airflow-providers-odbc==4.0.0 apache-airflow-providers-postgres==5.6.1 apache-airflow-providers-redis==3.3.2 apache-airflow-providers-sqlite==3.4.3 ### Deployment Docker-Compose ### Deployment details _No response_ ### Anything else A pickle error will be thrown in executor, but the operator itself will run and exit normally(state: SUCCESS). ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org