phi-friday opened a new issue, #35263:
URL: https://github.com/apache/airflow/issues/35263

   ### Apache Airflow version
   
   2.7.2
   
   ### What happened
   
   When using the docker operator and python operator together, the celery 
executor throws a TypeError.
   
   ```log
   [2023-10-30 16:22:36,427: INFO/ForkPoolWorker-7] Running <TaskInstance: 
test_docker_task_error.no_error manual__2023-10-30T07:22:35.534710+00:00 
[queued]> on host 97bf2fd65629
   [2023-10-30 16:22:36,865: ERROR/ForkPoolWorker-7] 
[081e6272-34c9-4f7f-8d5f-9dd490a1d663] Failed to execute task cannot pickle 
'module' object.
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
 line 155, in _execute_in_fork
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 
113, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py",
 line 431, in task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py",
 line 209, in _run_task_by_selected_method
       return _run_task_by_local_task_job(args, ti)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py",
 line 271, in _run_task_by_local_task_job
       ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 77, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 
289, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 
318, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/local_task_job_runner.py",
 line 192, in _execute
       self.handle_task_exit(return_code)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/local_task_job_runner.py",
 line 232, in handle_task_exit
       
self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 77, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 2748, in schedule_downstream_tasks
       partial_dag = task.dag.partial_subset(
                     ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 
2417, in partial_subset
       dag.task_dict = {
                       ^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 
2418, in <dictcomp>
       t.task_id: _deepcopy_task(t)
                  ^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 
2415, in _deepcopy_task
       return copy.deepcopy(t, memo)
              ^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/copy.py", line 153, in deepcopy
       y = copier(memo)
           ^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 1213, in __deepcopy__
       setattr(result, k, copy.deepcopy(v, memo))
                          ^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/copy.py", line 161, in deepcopy
       rv = reductor(4)
            ^^^^^^^^^^^
   TypeError: cannot pickle 'module' object
   [2023-10-30 16:22:36,890: ERROR/ForkPoolWorker-7] Task 
airflow.providers.celery.executors.celery_executor_utils.execute_command[081e6272-34c9-4f7f-8d5f-9dd490a1d663]
 raised unexpected: AirflowException('Celery command failed on host: 
97bf2fd65629 with celery_task_id 081e6272-34c9-4f7f-8d5f-9dd490a1d663')
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/trace.py", line 
477, in trace_task
       R = retval = fun(*args, **kwargs)
                    ^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/trace.py", line 
760, in __protected_call__
       return self.run(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
 line 121, in execute_command
       _execute_in_fork(command_to_exec, celery_task_id)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
 line 136, in _execute_in_fork
       raise AirflowException(msg)
   airflow.exceptions.AirflowException: Celery command failed on host: 
97bf2fd65629 with celery_task_id 081e6272-34c9-4f7f-8d5f-9dd490a1d663
   [2023-10-30 16:22:37,265: INFO/MainProcess] Task 
airflow.providers.celery.executors.celery_executor_utils.execute_command[4fb8737b-2a3d-493b-ade6-7edfd6042df3]
 received
   [2023-10-30 16:22:37,273: INFO/ForkPoolWorker-7] 
[4fb8737b-2a3d-493b-ade6-7edfd6042df3] Executing command in Celery: ['airflow', 
'tasks', 'run', 'test_docker_task_error', 'pickle_error', 
'manual__2023-10-30T07:22:35.534710+00:00', '--local', '--subdir', 
'DAGS_FOLDER/test_error.py']
   [2023-10-30 16:22:37,355: INFO/ForkPoolWorker-7] Filling up the DagBag from 
/opt/airflow/dags/test_error.py
   [2023-10-30 16:22:37,599: INFO/ForkPoolWorker-7] Running <TaskInstance: 
test_docker_task_error.pickle_error manual__2023-10-30T07:22:35.534710+00:00 
[queued]> on host 97bf2fd65629
   [2023-10-30 16:22:38,885: INFO/ForkPoolWorker-7] Task 
airflow.providers.celery.executors.celery_executor_utils.execute_command[4fb8737b-2a3d-493b-ade6-7edfd6042df3]
 succeeded in 1.618334080092609s: None
   ```
   
   An error occurs when two operators are linked by a dependency.
   
   
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   in apache/airflow:slim-2.7.2-3.11
   
   ```python
   # test_dag.py
   from __future__ import annotations
   
   from os import environ
   
   from airflow.decorators import dag, task
   from pendulum.datetime import DateTime
   from pendulum.tz import local_timezone
   
   DEFAULT_ARGS = {
       "image": "python:3.11-slim-bullseye",
       "api_version": "auto",
       "network_mode": "container:airflow-worker",
       "docker_url": "TCP://docker-socket-proxy:2375",
       "auto_remove": "force",
       "mount_tmp_dir": False,
       "container_name": "pickle_error_test",
       "user": environ["AIRFLOW_UID"],
   }
   
   
   @task.python()
   def no_error() -> None:
       import logging
   
       logger = logging.getLogger("airflow.task")
       logger.info("in celery")
   
   
   @task.docker()
   def pickle_error() -> None:
       import logging
   
       logger = logging.getLogger("airflow.task")
       logger.info("in docker")
   
   
   @dag(
       start_date=DateTime.now(local_timezone()).replace(
           hour=0, minute=0, second=0, microsecond=0
       ),
       schedule=None,
       default_args=DEFAULT_ARGS | {"do_xcom_push": False},
       catchup=False,
   )
   def test_docker_task_error() -> None:
       in_celery = no_error()
       in_docker = pickle_error()
       # Removing the underscore will not cause an error.
       _ = in_celery >> in_docker
   
   
   test_docker_task_error()
   
   ```
   
   ### Operating System
   
   Ubuntu 22.04.1 LTS
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-celery==3.3.4
   apache-airflow-providers-common-sql==1.7.2
   apache-airflow-providers-docker==3.7.5
   apache-airflow-providers-ftp==3.5.2
   apache-airflow-providers-http==4.5.2
   apache-airflow-providers-imap==3.3.2
   apache-airflow-providers-odbc==4.0.0
   apache-airflow-providers-postgres==5.6.1
   apache-airflow-providers-redis==3.3.2
   apache-airflow-providers-sqlite==3.4.3
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   A pickle error will be thrown in executor, but the operator itself will run 
and exit normally(state: SUCCESS).
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to