pablorecio opened a new issue #18114: URL: https://github.com/apache/airflow/issues/18114
### Apache Airflow version 2.1.3 (latest released) ### Operating System Debian GNU/Linux 10 (buster) ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==2.1.0 apache-airflow-providers-celery==2.0.0 apache-airflow-providers-cncf-kubernetes==2.0.2 apache-airflow-providers-docker==2.1.0 apache-airflow-providers-elasticsearch==2.0.2 apache-airflow-providers-ftp==2.0.0 apache-airflow-providers-google==5.0.0 apache-airflow-providers-grpc==2.0.0 apache-airflow-providers-hashicorp==2.0.0 apache-airflow-providers-http==2.0.0 apache-airflow-providers-imap==2.0.0 apache-airflow-providers-microsoft-azure==3.1.0 apache-airflow-providers-mysql==2.1.0 apache-airflow-providers-postgres==2.0.0 apache-airflow-providers-redis==2.0.0 apache-airflow-providers-sendgrid==2.0.0 apache-airflow-providers-sftp==2.1.0 apache-airflow-providers-slack==4.0.0 apache-airflow-providers-sqlite==2.0.0 apache-airflow-providers-ssh==2.1.0 ### Deployment Docker-Compose ### Deployment details Followed the guide in here: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#initializing-environment ### What happened I have the following DAG: ``` from datetime import datetime, timedelta # import requests from airflow.decorators import dag, task from airflow.models import Variable # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization DEFAULT_ARGS = { "owner": "BI Team", "depends_on_past": True, "start_date": datetime(2021, 7, 1), "email": ["[email protected]"], "email_on_failure": False, "email_on_retry": False, "retries": 3, "retry_delay": timedelta(minutes=5), } COMMIT_REF = Variable.get("commit_ref", "master") EXECUTION_DATE = "{{ ds }}" @dag( default_args=DEFAULT_ARGS, dagrun_timeout=timedelta(hours=8), schedule_interval="0 16 * * MON", max_active_runs=1, params={"alert_environment": COMMIT_REF} ) def employees_etl(): @task() def extract(): print(f"Extract: Hello, world! Today it is {EXECUTION_DATE}") return [i for i in range(10)] @task() def transform(events: list): print(f"Transform: Hello, world! Today it is {EXECUTION_DATE}") return [i * 2 for i in events] @task() def load_into_redshift(events: list): print(f"Load: Hello, world! Today it is {EXECUTION_DATE}") print(f"Result: {', '.join(map(str, events))}") return events = extract() transformed_events = transform(events) load_into_redshift(transformed_events) employees_etl_dag = employees_etl() ``` If I copy this file into my Airflow's `dags/` folder as is, it works as expected without any issues. However, if I compress it into a `.zip` file like: ``` zip -r ../my-dags.zip ./* ``` And move that file into Airflow's `dags/` folder, I can see the DAG just fine, and execute it. But then I get the following error from the worker: ``` airflow-worker_1 | [2021-09-09 10:30:32,286: INFO/ForkPoolWorker-15] Filling up the DagBag from /home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py airflow-worker_1 | [2021-09-09 10:30:32,299: ERROR/ForkPoolWorker-15] Failed to import: /home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py airflow-worker_1 | Traceback (most recent call last): airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dagbag.py", line 326, in _load_modules_from_file airflow-worker_1 | loader.exec_module(new_module) airflow-worker_1 | File "<frozen importlib._bootstrap_external>", line 843, in exec_module airflow-worker_1 | File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2076, in <module> airflow-worker_1 | class DagTag(Base): airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/ext/declarative/api.py", line 76, in __init__ airflow-worker_1 | _as_declarative(cls, classname, cls.__dict__) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/ext/declarative/base.py", line 131, in _as_declarative airflow-worker_1 | _MapperConfig.setup_mapping(cls, classname, dict_) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/ext/declarative/base.py", line 160, in setup_mapping airflow-worker_1 | cfg_cls(cls_, classname, dict_) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/ext/declarative/base.py", line 190, in __init__ airflow-worker_1 | self._setup_table() airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/ext/declarative/base.py", line 534, in _setup_table airflow-worker_1 | cls.__table__ = table = table_cls( airflow-worker_1 | File "<string>", line 2, in __new__ airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/deprecations.py", line 139, in warned airflow-worker_1 | return fn(*args, **kwargs) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/sql/schema.py", line 540, in __new__ airflow-worker_1 | raise exc.InvalidRequestError( airflow-worker_1 | sqlalchemy.exc.InvalidRequestError: Table 'dag_tag' is already defined for this MetaData instance. Specify 'extend_existing=True' to redefine options and columns on an existing Table object. airflow-worker_1 | [2021-09-09 10:30:32,383: ERROR/ForkPoolWorker-15] Failed to execute task dag_id could not be found: hj_employees_etl. Either the dag did not exist or it failed to parse.. airflow-worker_1 | Traceback (most recent call last): airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 117, in _execute_in_fork airflow-worker_1 | args.func(args) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command airflow-worker_1 | return func(*args, **kwargs) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper airflow-worker_1 | return f(*args, **kwargs) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 220, in task_run airflow-worker_1 | dag = get_dag(args.subdir, args.dag_id) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 192, in get_dag airflow-worker_1 | raise AirflowException( airflow-worker_1 | airflow.exceptions.AirflowException: dag_id could not be found: hj_employees_etl. Either the dag did not exist or it failed to parse. airflow-worker_1 | [2021-09-09 10:30:32,398: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[5aebb4af-26d5-45b9-a38e-9ebca5c4d11f] raised unexpected: AirflowException('Celery command failed on host: 58998d2f71dc') airflow-worker_1 | Traceback (most recent call last): airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task airflow-worker_1 | R = retval = fun(*args, **kwargs) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ airflow-worker_1 | return self.run(*args, **kwargs) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command airflow-worker_1 | _execute_in_fork(command_to_exec) airflow-worker_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork airflow-worker_1 | raise AirflowException('Celery command failed on host: ' + get_hostname()) airflow-worker_1 | airflow.exceptions.AirflowException: Celery command failed on host: 58998d2f71dc airflow-scheduler_1 | [2021-09-09 10:30:33,390] {scheduler_job.py:611} INFO - Executor reports execution of hj_employees_etl.extract execution_date=2021-07-05 16:00:00+00:00 exited with status failed for try_number 1 ``` ### What you expected to happen I would expect for DAGs implemented with TaskFlow API to work the same regardless if they're inside a `.zip` or not. ### How to reproduce - Follow the guide to setup airflow with docker compose: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html - Add a simple DAG that uses the TaskFlow API inside your `dags/` folder - Try to verify that it works - Compress the into a `.zip` file - Try again ### Anything else I've seen this same behaviour in our k8s cluster, just managed to reproduce it locally following the above steps. ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
