pablorecio opened a new issue #18114:
URL: https://github.com/apache/airflow/issues/18114


   ### Apache Airflow version
   
   2.1.3 (latest released)
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==2.1.0
   apache-airflow-providers-celery==2.0.0
   apache-airflow-providers-cncf-kubernetes==2.0.2
   apache-airflow-providers-docker==2.1.0
   apache-airflow-providers-elasticsearch==2.0.2
   apache-airflow-providers-ftp==2.0.0
   apache-airflow-providers-google==5.0.0
   apache-airflow-providers-grpc==2.0.0
   apache-airflow-providers-hashicorp==2.0.0
   apache-airflow-providers-http==2.0.0
   apache-airflow-providers-imap==2.0.0
   apache-airflow-providers-microsoft-azure==3.1.0
   apache-airflow-providers-mysql==2.1.0
   apache-airflow-providers-postgres==2.0.0
   apache-airflow-providers-redis==2.0.0
   apache-airflow-providers-sendgrid==2.0.0
   apache-airflow-providers-sftp==2.1.0
   apache-airflow-providers-slack==4.0.0
   apache-airflow-providers-sqlite==2.0.0
   apache-airflow-providers-ssh==2.1.0
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Followed the guide in here: 
https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#initializing-environment
   
   ### What happened
   
   I have the following DAG:
   
   ```
   from datetime import datetime, timedelta
   
   # import requests
   from airflow.decorators import dag, task
   from airflow.models import Variable
   
   
   
   # These args will get passed on to each operator
   # You can override them on a per-task basis during operator initialization
   DEFAULT_ARGS = {
       "owner": "BI Team",
       "depends_on_past": True,
       "start_date": datetime(2021, 7, 1),
       "email": ["[email protected]"],
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 3,
       "retry_delay": timedelta(minutes=5),
   }
   COMMIT_REF = Variable.get("commit_ref", "master")
   EXECUTION_DATE = "{{ ds }}"
   
   
   @dag(
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=8),
       schedule_interval="0 16 * * MON",
       max_active_runs=1,
       params={"alert_environment": COMMIT_REF}
   )
   def employees_etl():
   
       @task()
       def extract():
           print(f"Extract: Hello, world! Today it is {EXECUTION_DATE}")
           return [i for i in range(10)]
   
       @task()
       def transform(events: list):
           print(f"Transform: Hello, world! Today it is {EXECUTION_DATE}")
           return [i * 2 for i in events]
   
       @task()
       def load_into_redshift(events: list):
           print(f"Load: Hello, world! Today it is {EXECUTION_DATE}")
           print(f"Result: {', '.join(map(str, events))}")
           return
   
       events = extract()
       transformed_events = transform(events)
       load_into_redshift(transformed_events)
   
   
   employees_etl_dag = employees_etl()
   ```
   
   If I copy this file into my Airflow's `dags/` folder as is, it works as 
expected without any issues. However, if I compress it into a `.zip` file like:
   
   ```
   zip -r ../my-dags.zip ./*
   ```
   
   And move that file into Airflow's `dags/` folder, I can see the DAG just 
fine, and execute it. But then I get the following error from the worker:
   
   ```
   airflow-worker_1     | [2021-09-09 10:30:32,286: INFO/ForkPoolWorker-15] 
Filling up the DagBag from 
/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py
   airflow-worker_1     | [2021-09-09 10:30:32,299: ERROR/ForkPoolWorker-15] 
Failed to import: 
/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py
   airflow-worker_1     | Traceback (most recent call last):
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dagbag.py", 
line 326, in _load_modules_from_file
   airflow-worker_1     |     loader.exec_module(new_module)
   airflow-worker_1     |   File "<frozen importlib._bootstrap_external>", line 
843, in exec_module
   airflow-worker_1     |   File "<frozen importlib._bootstrap>", line 219, in 
_call_with_frames_removed
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 
2076, in <module>
   airflow-worker_1     |     class DagTag(Base):
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/ext/declarative/api.py",
 line 76, in __init__
   airflow-worker_1     |     _as_declarative(cls, classname, cls.__dict__)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/ext/declarative/base.py",
 line 131, in _as_declarative
   airflow-worker_1     |     _MapperConfig.setup_mapping(cls, classname, dict_)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/ext/declarative/base.py",
 line 160, in setup_mapping
   airflow-worker_1     |     cfg_cls(cls_, classname, dict_)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/ext/declarative/base.py",
 line 190, in __init__
   airflow-worker_1     |     self._setup_table()
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/ext/declarative/base.py",
 line 534, in _setup_table
   airflow-worker_1     |     cls.__table__ = table = table_cls(
   airflow-worker_1     |   File "<string>", line 2, in __new__
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/deprecations.py",
 line 139, in warned
   airflow-worker_1     |     return fn(*args, **kwargs)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/sql/schema.py", 
line 540, in __new__
   airflow-worker_1     |     raise exc.InvalidRequestError(
   airflow-worker_1     | sqlalchemy.exc.InvalidRequestError: Table 'dag_tag' 
is already defined for this MetaData instance.  Specify 'extend_existing=True' 
to redefine options and columns on an existing Table object.
   airflow-worker_1     | [2021-09-09 10:30:32,383: ERROR/ForkPoolWorker-15] 
Failed to execute task dag_id could not be found: hj_employees_etl. Either the 
dag did not exist or it failed to parse..
   airflow-worker_1     | Traceback (most recent call last):
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
 line 117, in _execute_in_fork
   airflow-worker_1     |     args.func(args)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
   airflow-worker_1     |     return func(*args, **kwargs)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 
91, in wrapper
   airflow-worker_1     |     return f(*args, **kwargs)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
 line 220, in task_run
   airflow-worker_1     |     dag = get_dag(args.subdir, args.dag_id)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 
192, in get_dag
   airflow-worker_1     |     raise AirflowException(
   airflow-worker_1     | airflow.exceptions.AirflowException: dag_id could not 
be found: hj_employees_etl. Either the dag did not exist or it failed to parse.
   airflow-worker_1     | [2021-09-09 10:30:32,398: ERROR/ForkPoolWorker-15] 
Task 
airflow.executors.celery_executor.execute_command[5aebb4af-26d5-45b9-a38e-9ebca5c4d11f]
 raised unexpected: AirflowException('Celery command failed on host: 
58998d2f71dc')
   airflow-worker_1     | Traceback (most recent call last):
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 
412, in trace_task
   airflow-worker_1     |     R = retval = fun(*args, **kwargs)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 
704, in __protected_call__
   airflow-worker_1     |     return self.run(*args, **kwargs)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
 line 88, in execute_command
   airflow-worker_1     |     _execute_in_fork(command_to_exec)
   airflow-worker_1     |   File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
 line 99, in _execute_in_fork
   airflow-worker_1     |     raise AirflowException('Celery command failed on 
host: ' + get_hostname())
   airflow-worker_1     | airflow.exceptions.AirflowException: Celery command 
failed on host: 58998d2f71dc
   airflow-scheduler_1  | [2021-09-09 10:30:33,390] {scheduler_job.py:611} INFO 
- Executor reports execution of hj_employees_etl.extract 
execution_date=2021-07-05 16:00:00+00:00 exited with status failed for 
try_number 1
   ```
   
   ### What you expected to happen
   
   I would expect for DAGs implemented with TaskFlow API to work the same 
regardless if they're inside a `.zip` or not.
   
   ### How to reproduce
   
   - Follow the guide to setup airflow with docker compose: 
https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html
   - Add a simple DAG that uses the TaskFlow API inside your `dags/` folder
   - Try to verify that it works
   - Compress the into a `.zip` file 
   - Try again
   
   ### Anything else
   
   I've seen this same behaviour in our k8s cluster, just managed to reproduce 
it locally following the above steps.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to