kaxil commented on code in PR #67127:
URL: https://github.com/apache/airflow/pull/67127#discussion_r3262851195
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -160,6 +160,21 @@ def create_celery_app(team_conf: ExecutorConf |
AirflowConfigParser) -> Celery:
return celery_app
+@lru_cache(maxsize=8)
+def _get_celery_app_for_workload(team_name: str | None) -> Celery:
+ """Return a subprocess-local Celery app cached by team name for task
publishing."""
Review Comment:
The "subprocess-local" claim only holds when `_send_workloads_to_celery`
actually uses the `ProcessPoolExecutor` branch. In
`celery_executor.py:243-245`, the single-workload (or `sync_parallelism=1`)
path runs `send_workload_to_executor` inline via `map(...)` in the scheduler
process. In that case this cache lives in the scheduler itself and keeps the
cached Celery apps' broker connections open there too. Worth either updating
the docstring to reflect "scheduler process or publisher subprocess, depending
on path", or being explicit that this is intentional.
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -160,6 +160,21 @@ def create_celery_app(team_conf: ExecutorConf |
AirflowConfigParser) -> Celery:
return celery_app
+@lru_cache(maxsize=8)
+def _get_celery_app_for_workload(team_name: str | None) -> Celery:
+ """Return a subprocess-local Celery app cached by team name for task
publishing."""
+ if TYPE_CHECKING:
+ _conf: ExecutorConf | AirflowConfigParser
Review Comment:
This `if TYPE_CHECKING: _conf: ExecutorConf | AirflowConfigParser` is a
copy-paste leftover from the old `send_workload_to_executor`. The annotation
never escapes this function and `_conf` already gets a concrete type from the
if/else assignment below, so this whole two-line block can be dropped.
##########
providers/celery/tests/unit/celery/executors/test_celery_executor.py:
##########
@@ -77,6 +77,13 @@
pytestmark = pytest.mark.db_test
[email protected](autouse=True)
+def clear_cached_workload_celery_apps():
+ celery_executor_utils._get_celery_app_for_workload.cache_clear()
+ yield
+ celery_executor_utils._get_celery_app_for_workload.cache_clear()
Review Comment:
The two new tests cover `team-a` reuse and `team-a` vs `team-b` separation,
but not `team_name=None`, which is the path the vast majority of deployments
hit (no multi-team config). A third parametrized case asserting `None` also
hits the cache would catch a regression where `lru_cache` started treating
`None` specially.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]