Copilot commented on code in PR #64392:
URL: https://github.com/apache/airflow/pull/64392#discussion_r3025333500
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -138,6 +138,12 @@ def create_celery_app(team_conf: ExecutorConf |
AirflowConfigParser) -> Celery:
config = get_default_celery_config(team_conf)
+ # Apply celery_config_options override if explicitly configured
+ if conf.has_option("celery", "celery_config_options"):
+ user_config = conf.getimport("celery", "celery_config_options")
Review Comment:
`create_celery_app()` applies `celery_config_options` using the global
`conf` object and `conf.has_option(...)`. In Airflow config,
`celery_config_options` has a non-empty default (see
`providers/celery/get_provider_info.py:158-164`), so `has_option()` will be
true even when the user did not override anything. This means
`conf.getimport(...)` will import the module-level `DEFAULT_CELERY_CONFIG`
(built from global conf) and `config.update(...)` can overwrite the
team-specific config computed by `get_default_celery_config(team_conf)`
(breaking multi-team isolation). Use `team_conf` (not global `conf`) and only
apply the override when the configured import path differs from the default
value (e.g., compare `team_conf.get('celery','celery_config_options')` to
`team_conf.get_default_value('celery','celery_config_options')`, or inspect the
option's source).
```suggestion
# Apply celery_config_options override only if the team-specific config
# explicitly differs from its default value.
override_path = None
default_override_path = None
try:
override_path = team_conf.get("celery", "celery_config_options")
except Exception:
override_path = None
if hasattr(team_conf, "get_default_value"):
try:
default_override_path = team_conf.get_default_value("celery",
"celery_config_options")
except Exception:
default_override_path = None
should_apply_override = bool(override_path) and override_path !=
default_override_path
if not hasattr(team_conf, "get_default_value"):
# Fallback for configs without get_default_value: treat presence of
the
# option as an explicit override, but still use team_conf, not
global conf.
should_apply_override = getattr(team_conf, "has_option", lambda *_,
**__: False)(
"celery", "celery_config_options"
) and bool(override_path)
if should_apply_override:
user_config = team_conf.getimport("celery", "celery_config_options")
```
##########
providers/celery/tests/unit/celery/executors/test_celery_executor.py:
##########
@@ -814,3 +814,101 @@ def test_execute_workload_ignores_already_running_task():
"""
with pytest.raises(Ignore):
execute_workload_unwrapped(workload_json)
+
+
+class TestAmqpsSslConfig:
+ """Tests for amqps:// broker URL SSL configuration (Fix for substring
match bug)."""
+
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+ ("celery", "SSL_ACTIVE"): "True",
+ ("celery", "SSL_KEY"): "/path/to/key.pem",
+ ("celery", "SSL_CERT"): "/path/to/cert.pem",
+ ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+ }
+ )
+ def test_amqps_broker_url_builds_ssl_config(self):
+ """Test that amqps:// broker URLs correctly build broker_use_ssl with
AMQP param names."""
+ import importlib
+ import ssl
+
+ importlib.reload(default_celery)
+
+ config = default_celery.DEFAULT_CELERY_CONFIG
+ assert "broker_use_ssl" in config, "broker_use_ssl should be set for
amqps:// URLs"
+ broker_ssl = config["broker_use_ssl"]
+ assert broker_ssl["keyfile"] == "/path/to/key.pem"
+ assert broker_ssl["certfile"] == "/path/to/cert.pem"
+ assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
+ assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+ # Must NOT have ssl_ prefixed keys (those are for Redis)
+ assert "ssl_keyfile" not in broker_ssl
+ assert "ssl_certfile" not in broker_ssl
+
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "amqp://guest:guest@rabbitmq:5672//",
+ ("celery", "SSL_ACTIVE"): "True",
+ ("celery", "SSL_KEY"): "/path/to/key.pem",
+ ("celery", "SSL_CERT"): "/path/to/cert.pem",
+ ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+ }
+ )
+ def test_amqp_broker_url_still_builds_ssl_config(self):
+ """Test that amqp:// (non-TLS) broker URLs still build SSL config
correctly (no regression)."""
+ import importlib
+ import ssl
+
+ importlib.reload(default_celery)
+
+ config = default_celery.DEFAULT_CELERY_CONFIG
+ assert "broker_use_ssl" in config
+ broker_ssl = config["broker_use_ssl"]
+ assert broker_ssl["keyfile"] == "/path/to/key.pem"
+ assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+ ("celery", "SSL_ACTIVE"): "False",
+ }
+ )
+ def test_amqps_broker_url_no_ssl_when_inactive(self):
+ """Test that amqps:// broker URLs don't get SSL config when SSL_ACTIVE
is False."""
+ import importlib
+
+ importlib.reload(default_celery)
+
+ config = default_celery.DEFAULT_CELERY_CONFIG
+ assert "broker_use_ssl" not in config
+
+
+class TestCeleryConfigOptionsOverride:
+ """Tests for celery_config_options being applied in create_celery_app()."""
+
+ def test_celery_config_options_applied_in_create_celery_app(self):
+ """Test that celery_config_options overrides are merged into
create_celery_app() config."""
+ custom_config = {"worker_concurrency": 42, "broker_url":
"redis://custom:6379/0"}
+
+ original_has_option = conf.has_option
+
+ def mock_has_option(section, key, **kwargs):
+ if section == "celery" and key == "celery_config_options":
+ return True
+ return original_has_option(section, key, **kwargs)
+
+ with (
+ mock.patch.object(conf, "has_option", side_effect=mock_has_option),
+ mock.patch.object(conf, "getimport", return_value=custom_config),
+ ):
+ celery_app = celery_executor_utils.create_celery_app(conf)
+ # The custom config should override defaults
+ assert celery_app.conf.worker_concurrency == 42
+ assert celery_app.conf.broker_url == "redis://custom:6379/0"
+
Review Comment:
The new `celery_config_options` test only exercises the global `conf` case.
Given `create_celery_app()` accepts `ExecutorConf` for multi-team mode, please
add a regression test that passes a team-specific `ExecutorConf` (with a
different `BROKER_URL`/team_name) and verifies the default
`celery_config_options` does not overwrite the team-specific config, and that a
non-default `celery_config_options` override is applied as expected for that
team.
```suggestion
def
test_celery_config_options_team_executor_conf_does_not_override_broker_and_applies_override(
self,
):
"""
Test that when create_celery_app() is called with an ExecutorConf
for a specific team:
- The team-specific broker_url is not overwritten by default
celery_config_options.
- A non-default celery_config_options override (e.g.
worker_concurrency) is applied for that team.
"""
# Team-specific broker URL, distinct from any global default.
team_broker_url = "redis://team-specific-broker:6379/0"
# Use the real ExecutorConf type exposed by celery_executor_utils
for multi-team mode.
executor_conf =
celery_executor_utils.ExecutorConf(team_name="test-team", conf=conf)
# Custom celery_config_options override for this test: change
worker_concurrency only.
custom_config = {"worker_concurrency": 99}
original_has_option = conf.has_option
def mock_has_option(section, key, **kwargs):
if section == "celery" and key == "celery_config_options":
return True
return original_has_option(section, key, **kwargs)
with (
mock.patch.object(conf, "has_option",
side_effect=mock_has_option),
mock.patch.object(conf, "getimport", return_value=custom_config),
# Ensure the team-specific broker URL is part of the executor
configuration.
mock.patch.object(
celery_executor_utils,
"DEFAULT_CELERY_CONFIG",
{**celery_executor_utils.DEFAULT_CELERY_CONFIG,
"broker_url": team_broker_url},
),
):
celery_app =
celery_executor_utils.create_celery_app(executor_conf)
# Default celery_config_options must not override the team-specific
broker_url.
assert celery_app.conf.broker_url == team_broker_url
# But the non-default celery_config_options override must still be
applied for this team.
assert celery_app.conf.worker_concurrency == 99
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]