ImmortalLotus opened a new issue, #35179:
URL: https://github.com/apache/airflow/issues/35179

   ### Apache Airflow version
   
   2.7.2
   
   ### What happened
   
   airflow throws this error when trying to terminate a task that comes from a 
DAG that is created using DAG decorator and then it can't terminate the pod 
when using Kubernetes Executor.
   
   It runs the task correctly, however when killing the pod it throws this 
error and then does not delete the pod.
   
   exit log:
   ```
   [2023-10-25T13:23:21.856+0000] {local_task_job_runner.py:228} INFO - Task 
exited with return code 0
   Traceback (most recent call last):
   File "/home/airflow/.local/bin/airflow", line 8, in <module>
   sys.exit(main())
   File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", 
line 60, in main
   args.func(args)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_config.py", 
line 49, in command
   return func(*args, **kwargs)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 
113, in wrapper
   return f(*args, **kwargs)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
 line 430, in task_run
   task_return_code = _run_task_by_selected_method(args, _dag, ti)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
 line 208, in _run_task_by_selected_method
   return _run_task_by_local_task_job(args, ti)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
 line 270, in _run_task_by_local_task_job
   ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 77, in wrapper
   return func(*args, session=session, **kwargs)
   File "/home/airflow/.local/lib/python3.8/site-packages/airf
   File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/job.py", 
line 318, in execute_job
   ret = execute_callable()
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job_runner.py",
 line 192, in _execute
   self.handle_task_exit(return_code)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job_runner.py",
 line 232, in handle_task_exit
   
self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 77, in wrapper
   return func(*args, session=session, **kwargs)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 2754, in schedule_downstream_tasks
   partial_dag = task.dag.partial_subset(
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 
2393, in partial_subset
   dag.task_dict = {
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 
2394, in <dictcomp>
   t.task_id: _deepcopy_task(t)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 
2391, in _deepcopy_task
   return copy.deepcopy(t, memo)
   File "/usr/local/lib/python3.8/copy.py", line 153, in deepcopy
   y = copier(memo)
   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py",
 line 1214, in __deepcopy__
   setattr(result, k, copy.deepcopy(v, memo))
   File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
   y = copier(x, memo)
   File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
   y[deepcopy(key, memo)] = deepcopy(value, memo)
   File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
   y = copier(x, memo)
   File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
   y[deepcopy(key, memo)] = deepcopy(value, memo)
   File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
   y = _reconstruct(x, memo, *rv)
   File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
   state = deepcopy(state, memo)
   File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
   y = copier(x, memo)
   File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
   y[deepcopy(key, memo)] = deepcopy(value, memo)
   File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
   y = _reconstruct(x, memo, *rv)
   File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
   state = deepcopy(state, memo)
   File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
   y = copier(x, memo)
   File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
   y[deepcopy(key, memo)] = deepcopy(value, memo)
   File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
   y = _reconstruct(x, memo, *rv)
   File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
   state = deepcopy(state, memo)
   File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
   y = copier(x, memo)
   File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
   y[deepcopy(key, memo)] = deepcopy(value, memo)
   File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
   y = _reconstruct(x, memo, *rv)
   File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
   state = deepcopy(state, memo)
   File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
   y = copier(x, memo)
   File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
   y[deepcopy(key, memo)] = deepcopy(value, memo)
   File "/usr/local/lib/python3.8/copy.py", line 161, in deepcopy
   rv = reductor(4)
   TypeError: cannot pickle 'module' object
   ```
   
   ### What you think should happen instead
   
   It should terminate correctly and delete the pod.
   
   ### How to reproduce
   
   Use the following class(that is still not perfect) to generate a dag like 
below. the purpose of this class is to encapsulate some common usage we have 
when working some speficic EL jobs: 
   ```
   from __future__ import annotations
   from datetime import datetime, timedelta
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.python_operator import BranchPythonOperator
   from airflow.operators.dummy_operator import DummyOperator
   from docker.types import Mount
   from airflow.operators.dummy_operator import DummyOperator
   from kubernetes.client import models as k8s
   from airflow.models import Variable
   from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
   from airflow.providers.postgres.hooks.postgres import PostgresHook
   
   from typing import TYPE_CHECKING, Any
   
   import httpx
   import pendulum
   from sqlalchemy import create_engine
   from airflow.decorators import dag, task
   from airflow.models.baseoperator import BaseOperator
   from airflow.operators.email import EmailOperator
   from airflow.utils.task_group import TaskGroup
   from airflow.operators.python import PythonOperator
   if TYPE_CHECKING:
       from airflow.utils.context import Context
       
   
       # [START dag_decorator_usage]
   class sql_dag():
       
       def __init__(self,nome_servidor_destino, servidor_origem_dict:dict, 
nome_banco_destino , nome_banco_origem, schema):
           conexao = MsSqlHook.get_connection(nome_servidor_destino)
           
self.hook_banco_destino=MsSqlHook(mssql_conn_id=nome_servidor_destino)
           self.engine_destino = 
create_engine(f'mssql+pyodbc://{conexao.login}:{conexao.password}@{conexao.host}/{nome_banco_destino}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=YES')
           for chave, valor in servidor_origem_dict.items():
               if(valor=='Postgres'):
                   self.hook_banco_origem=PostgresHook(postgres_conn_id=chave)
               elif(valor=='MSSQL'):
                   self.hook_banco_origem=MsSqlHook(mssql_conn_id=chave)
               else:
                   raise Exception("banco não suportado")
           self.schema = schema
           self.sql_template_padrao=f'use [{nome_banco_origem}] select * from '
           self.truncate_sql=f"""use [{nome_banco_destino}] truncate table """
           
       def configurar_dag(self,horario:str,nome:str, origem,acao,destino):   
           tabela_dict= Variable.get(nome, default_var={"erro":["erro"]}, 
deserialize_json=True)      
           
           def truncate_tabelas(**kwargs):
               sql3=kwargs['self'].truncate_sql+kwargs['tabela']
               kwargs['self'].hook_banco_destino.run(sql3)
           
           def insert_tabelas(**kwargs):
               sql4 = kwargs['self'].sql_template_padrao+kwargs['tabela']
               df =  kwargs['self'].hook_banco_origem.get_pandas_df(sql4)
               df.to_sql(kwargs['tabela'], 
schema=kwargs['self'].schema,con=kwargs['self'].engine_destino,if_exists='append',
 index=False)   
               
           @dag(
               schedule=horario,
               dag_id=nome,
               start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
               catchup=False,
               tags=[origem,acao,destino]
           )        
           def criar():     
               start = DummyOperator(
                   task_id='comecaDag'
                   )       
               end = DummyOperator(
                   task_id='endDag'
                   )
               with TaskGroup(group_id=f"insert_tabelas") as insert:
                   se_estiver_vazio=DummyOperator(
                               task_id='pra_evitar_erros'
                               )  
                   for chave,valor in tabela_dict.items():
                       if(valor=='padraoSemDtInsert'):
                           task_insert_tabelas = 
PythonOperator(task_id=f'insert_{chave}',
                               python_callable=insert_tabelas,
                               op_kwargs={"tabela":chave, "self":self}
                               )      
               with TaskGroup(group_id=f"truncate_tabelas") as truncate:
                   se_estiver_vazio2=DummyOperator(
                               task_id='pra_evitar_erros2'
                               )  
                   for chave, valor in tabela_dict.items():
                           task_truncate_tabelas = 
PythonOperator(task_id=f'truncate_{chave}',
                               python_callable=truncate_tabelas,
                               op_kwargs={"tabela":chave, "self":self}
                               )  
               start>>truncate>>insert>>end
           dag_teste=criar()
   
   ```
   
   code to use the above class, that works when considering a MSSQL destination 
and a PGSQL/MSSQL Source:
   ```
   
   from cagd_libs.BANCOS_SQL import sql_dag
   from airflow.models import Variable
   
   tabela_dict={"YourTable":"padraoSemDtInsert"
                }
   
   nome_variavel="ID_Of_the_DAG_YOULL_CREATE"
   
   Variable.set(key=nome_variavel, value=tabela_dict, serialize_json=True)
   
dag_sql=sql_dag(nome_servidor_destino='ID_OF_THE_DESTINATION_DATABASE_IN_AIRFLOW',
                   
servidor_origem_dict={'ID_OF_THE_SOURCE_DATABASE_IN_AIRFLOW':'Postgres'},
                   nome_banco_destino='destination_database_name',
                   nome_banco_origem='source_database_name',
                   schema='database_schema')
   teste_dag= 
dag_sql.configurar_dag('airflow_schedule',nome_variavel,"tags_that_we_use","tags_that_we_use","tags_that_we_use")
   ```
   
   ### Operating System
   
   Red Hat Openshift, airflow HelmChart
   
   ### Versions of Apache Airflow Providers
   
   pip install apache-airflow-providers-odbc \
       && pip install apache-airflow-providers-microsoft-mssql
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   this error occus on every that that is creating using above class, 
regardless of whether it runs correctly or not, the pod throws the log cited 
above and then is not deleted
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to