ImmortalLotus opened a new issue, #35179: URL: https://github.com/apache/airflow/issues/35179
### Apache Airflow version 2.7.2 ### What happened airflow throws this error when trying to terminate a task that comes from a DAG that is created using DAG decorator and then it can't terminate the pod when using Kubernetes Executor. It runs the task correctly, however when killing the pod it throws this error and then does not delete the pod. exit log: ``` [2023-10-25T13:23:21.856+0000] {local_task_job_runner.py:228} INFO - Task exited with return code 0 Traceback (most recent call last): File "/home/airflow/.local/bin/airflow", line 8, in <module> sys.exit(main()) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 60, in main args.func(args) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 113, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 430, in task_run task_return_code = _run_task_by_selected_method(args, _dag, ti) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 208, in _run_task_by_selected_method return _run_task_by_local_task_job(args, ti) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 270, in _run_task_by_local_task_job ret = run_job(job=job_runner.job, execute_callable=job_runner._execute) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper return func(*args, session=session, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airf File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/job.py", line 318, in execute_job ret = execute_callable() File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job_runner.py", line 192, in _execute self.handle_task_exit(return_code) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job_runner.py", line 232, in handle_task_exit self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper return func(*args, session=session, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2754, in schedule_downstream_tasks partial_dag = task.dag.partial_subset( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2393, in partial_subset dag.task_dict = { File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2394, in <dictcomp> t.task_id: _deepcopy_task(t) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2391, in _deepcopy_task return copy.deepcopy(t, memo) File "/usr/local/lib/python3.8/copy.py", line 153, in deepcopy y = copier(memo) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1214, in __deepcopy__ setattr(result, k, copy.deepcopy(v, memo)) File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict y[deepcopy(key, memo)] = deepcopy(value, memo) File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict y[deepcopy(key, memo)] = deepcopy(value, memo) File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy y = _reconstruct(x, memo, *rv) File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct state = deepcopy(state, memo) File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict y[deepcopy(key, memo)] = deepcopy(value, memo) File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy y = _reconstruct(x, memo, *rv) File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct state = deepcopy(state, memo) File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict y[deepcopy(key, memo)] = deepcopy(value, memo) File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy y = _reconstruct(x, memo, *rv) File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct state = deepcopy(state, memo) File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict y[deepcopy(key, memo)] = deepcopy(value, memo) File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy y = _reconstruct(x, memo, *rv) File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct state = deepcopy(state, memo) File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy y = copier(x, memo) File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict y[deepcopy(key, memo)] = deepcopy(value, memo) File "/usr/local/lib/python3.8/copy.py", line 161, in deepcopy rv = reductor(4) TypeError: cannot pickle 'module' object ``` ### What you think should happen instead It should terminate correctly and delete the pod. ### How to reproduce Use the following class(that is still not perfect) to generate a dag like below. the purpose of this class is to encapsulate some common usage we have when working some speficic EL jobs: ``` from __future__ import annotations from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from docker.types import Mount from airflow.operators.dummy_operator import DummyOperator from kubernetes.client import models as k8s from airflow.models import Variable from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook from airflow.providers.postgres.hooks.postgres import PostgresHook from typing import TYPE_CHECKING, Any import httpx import pendulum from sqlalchemy import create_engine from airflow.decorators import dag, task from airflow.models.baseoperator import BaseOperator from airflow.operators.email import EmailOperator from airflow.utils.task_group import TaskGroup from airflow.operators.python import PythonOperator if TYPE_CHECKING: from airflow.utils.context import Context # [START dag_decorator_usage] class sql_dag(): def __init__(self,nome_servidor_destino, servidor_origem_dict:dict, nome_banco_destino , nome_banco_origem, schema): conexao = MsSqlHook.get_connection(nome_servidor_destino) self.hook_banco_destino=MsSqlHook(mssql_conn_id=nome_servidor_destino) self.engine_destino = create_engine(f'mssql+pyodbc://{conexao.login}:{conexao.password}@{conexao.host}/{nome_banco_destino}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=YES') for chave, valor in servidor_origem_dict.items(): if(valor=='Postgres'): self.hook_banco_origem=PostgresHook(postgres_conn_id=chave) elif(valor=='MSSQL'): self.hook_banco_origem=MsSqlHook(mssql_conn_id=chave) else: raise Exception("banco não suportado") self.schema = schema self.sql_template_padrao=f'use [{nome_banco_origem}] select * from ' self.truncate_sql=f"""use [{nome_banco_destino}] truncate table """ def configurar_dag(self,horario:str,nome:str, origem,acao,destino): tabela_dict= Variable.get(nome, default_var={"erro":["erro"]}, deserialize_json=True) def truncate_tabelas(**kwargs): sql3=kwargs['self'].truncate_sql+kwargs['tabela'] kwargs['self'].hook_banco_destino.run(sql3) def insert_tabelas(**kwargs): sql4 = kwargs['self'].sql_template_padrao+kwargs['tabela'] df = kwargs['self'].hook_banco_origem.get_pandas_df(sql4) df.to_sql(kwargs['tabela'], schema=kwargs['self'].schema,con=kwargs['self'].engine_destino,if_exists='append', index=False) @dag( schedule=horario, dag_id=nome, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=[origem,acao,destino] ) def criar(): start = DummyOperator( task_id='comecaDag' ) end = DummyOperator( task_id='endDag' ) with TaskGroup(group_id=f"insert_tabelas") as insert: se_estiver_vazio=DummyOperator( task_id='pra_evitar_erros' ) for chave,valor in tabela_dict.items(): if(valor=='padraoSemDtInsert'): task_insert_tabelas = PythonOperator(task_id=f'insert_{chave}', python_callable=insert_tabelas, op_kwargs={"tabela":chave, "self":self} ) with TaskGroup(group_id=f"truncate_tabelas") as truncate: se_estiver_vazio2=DummyOperator( task_id='pra_evitar_erros2' ) for chave, valor in tabela_dict.items(): task_truncate_tabelas = PythonOperator(task_id=f'truncate_{chave}', python_callable=truncate_tabelas, op_kwargs={"tabela":chave, "self":self} ) start>>truncate>>insert>>end dag_teste=criar() ``` code to use the above class, that works when considering a MSSQL destination and a PGSQL/MSSQL Source: ``` from cagd_libs.BANCOS_SQL import sql_dag from airflow.models import Variable tabela_dict={"YourTable":"padraoSemDtInsert" } nome_variavel="ID_Of_the_DAG_YOULL_CREATE" Variable.set(key=nome_variavel, value=tabela_dict, serialize_json=True) dag_sql=sql_dag(nome_servidor_destino='ID_OF_THE_DESTINATION_DATABASE_IN_AIRFLOW', servidor_origem_dict={'ID_OF_THE_SOURCE_DATABASE_IN_AIRFLOW':'Postgres'}, nome_banco_destino='destination_database_name', nome_banco_origem='source_database_name', schema='database_schema') teste_dag= dag_sql.configurar_dag('airflow_schedule',nome_variavel,"tags_that_we_use","tags_that_we_use","tags_that_we_use") ``` ### Operating System Red Hat Openshift, airflow HelmChart ### Versions of Apache Airflow Providers pip install apache-airflow-providers-odbc \ && pip install apache-airflow-providers-microsoft-mssql ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else this error occus on every that that is creating using above class, regardless of whether it runs correctly or not, the pod throws the log cited above and then is not deleted ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org