This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 17290c67514 AIP-67 - Multi Team: Pass args/kwargs to super in
CeleryExecutor (#56006)
17290c67514 is described below
commit 17290c675146de516924acddd15d842295555e98
Author: Niko Oliveira <[email protected]>
AuthorDate: Tue Sep 23 15:57:42 2025 -0700
AIP-67 - Multi Team: Pass args/kwargs to super in CeleryExecutor (#56006)
* pass args/kwargs to super in Celery executors
This allows the team_name to be passed to the super class. This is low
hanging fruit to allow the Celery executor to be used for multi team
testing.
More changes will be needed to allow the Celery executor to use team-based
config, but that will be done at a future time.
* Fix compat tests
---
.../providers/celery/executors/celery_executor.py | 4 ++--
.../unit/celery/executors/test_celery_executor.py | 18 ++++++++++++++++++
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
index 95c4c97f12c..6091dc5c124 100644
--- a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
+++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
@@ -290,8 +290,8 @@ class CeleryExecutor(BaseExecutor):
# TODO: TaskSDK: move this type change into BaseExecutor
queued_tasks: dict[TaskInstanceKey, workloads.All] # type:
ignore[assignment]
- def __init__(self):
- super().__init__()
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
# Celery doesn't support bulk sending the tasks (which can become a
bottleneck on bigger clusters)
# so we use a multiprocessing pool to speed this up.
diff --git
a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
index 9384ef626ce..771b8b232bd 100644
--- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
+++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
@@ -126,6 +126,24 @@ class TestCeleryExecutor:
def test_cli_commands_vended(self):
assert CeleryExecutor.get_cli_commands()
+ def test_celery_executor_init_with_args_kwargs(self):
+ """Test that CeleryExecutor properly passes args and kwargs to
BaseExecutor."""
+ parallelism = 50
+ team_name = "test_team"
+
+ if AIRFLOW_V_3_1_PLUS:
+ # team_name was added in Airflow 3.1
+ executor = celery_executor.CeleryExecutor(parallelism=parallelism,
team_name=team_name)
+ else:
+ executor = celery_executor.CeleryExecutor(parallelism)
+
+ assert executor.parallelism == parallelism
+
+ if AIRFLOW_V_3_1_PLUS:
+ # team_name was added in Airflow 3.1
+ assert executor.team_name == team_name
+ assert executor.conf.team_name == team_name
+
@pytest.mark.backend("mysql", "postgres")
def test_exception_propagation(self, caplog):
caplog.set_level(