Copilot commented on code in PR #64392:
URL: https://github.com/apache/airflow/pull/64392#discussion_r3025333500


##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -138,6 +138,12 @@ def create_celery_app(team_conf: ExecutorConf | 
AirflowConfigParser) -> Celery:
 
     config = get_default_celery_config(team_conf)
 
+    # Apply celery_config_options override if explicitly configured
+    if conf.has_option("celery", "celery_config_options"):
+        user_config = conf.getimport("celery", "celery_config_options")

Review Comment:
   `create_celery_app()` applies `celery_config_options` using the global 
`conf` object and `conf.has_option(...)`. In Airflow config, 
`celery_config_options` has a non-empty default (see 
`providers/celery/get_provider_info.py:158-164`), so `has_option()` will be 
true even when the user did not override anything. This means 
`conf.getimport(...)` will import the module-level `DEFAULT_CELERY_CONFIG` 
(built from global conf) and `config.update(...)` can overwrite the 
team-specific config computed by `get_default_celery_config(team_conf)` 
(breaking multi-team isolation). Use `team_conf` (not global `conf`) and only 
apply the override when the configured import path differs from the default 
value (e.g., compare `team_conf.get('celery','celery_config_options')` to 
`team_conf.get_default_value('celery','celery_config_options')`, or inspect the 
option's source).
   ```suggestion
       # Apply celery_config_options override only if the team-specific config
       # explicitly differs from its default value.
       override_path = None
       default_override_path = None
   
       try:
           override_path = team_conf.get("celery", "celery_config_options")
       except Exception:
           override_path = None
   
       if hasattr(team_conf, "get_default_value"):
           try:
               default_override_path = team_conf.get_default_value("celery", 
"celery_config_options")
           except Exception:
               default_override_path = None
   
       should_apply_override = bool(override_path) and override_path != 
default_override_path
   
       if not hasattr(team_conf, "get_default_value"):
           # Fallback for configs without get_default_value: treat presence of 
the
           # option as an explicit override, but still use team_conf, not 
global conf.
           should_apply_override = getattr(team_conf, "has_option", lambda *_, 
**__: False)(
               "celery", "celery_config_options"
           ) and bool(override_path)
   
       if should_apply_override:
           user_config = team_conf.getimport("celery", "celery_config_options")
   ```



##########
providers/celery/tests/unit/celery/executors/test_celery_executor.py:
##########
@@ -814,3 +814,101 @@ def test_execute_workload_ignores_already_running_task():
         """
         with pytest.raises(Ignore):
             execute_workload_unwrapped(workload_json)
+
+
+class TestAmqpsSslConfig:
+    """Tests for amqps:// broker URL SSL configuration (Fix for substring 
match bug)."""
+
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+            ("celery", "SSL_ACTIVE"): "True",
+            ("celery", "SSL_KEY"): "/path/to/key.pem",
+            ("celery", "SSL_CERT"): "/path/to/cert.pem",
+            ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+        }
+    )
+    def test_amqps_broker_url_builds_ssl_config(self):
+        """Test that amqps:// broker URLs correctly build broker_use_ssl with 
AMQP param names."""
+        import importlib
+        import ssl
+
+        importlib.reload(default_celery)
+
+        config = default_celery.DEFAULT_CELERY_CONFIG
+        assert "broker_use_ssl" in config, "broker_use_ssl should be set for 
amqps:// URLs"
+        broker_ssl = config["broker_use_ssl"]
+        assert broker_ssl["keyfile"] == "/path/to/key.pem"
+        assert broker_ssl["certfile"] == "/path/to/cert.pem"
+        assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
+        assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+        # Must NOT have ssl_ prefixed keys (those are for Redis)
+        assert "ssl_keyfile" not in broker_ssl
+        assert "ssl_certfile" not in broker_ssl
+
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "amqp://guest:guest@rabbitmq:5672//",
+            ("celery", "SSL_ACTIVE"): "True",
+            ("celery", "SSL_KEY"): "/path/to/key.pem",
+            ("celery", "SSL_CERT"): "/path/to/cert.pem",
+            ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+        }
+    )
+    def test_amqp_broker_url_still_builds_ssl_config(self):
+        """Test that amqp:// (non-TLS) broker URLs still build SSL config 
correctly (no regression)."""
+        import importlib
+        import ssl
+
+        importlib.reload(default_celery)
+
+        config = default_celery.DEFAULT_CELERY_CONFIG
+        assert "broker_use_ssl" in config
+        broker_ssl = config["broker_use_ssl"]
+        assert broker_ssl["keyfile"] == "/path/to/key.pem"
+        assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+            ("celery", "SSL_ACTIVE"): "False",
+        }
+    )
+    def test_amqps_broker_url_no_ssl_when_inactive(self):
+        """Test that amqps:// broker URLs don't get SSL config when SSL_ACTIVE 
is False."""
+        import importlib
+
+        importlib.reload(default_celery)
+
+        config = default_celery.DEFAULT_CELERY_CONFIG
+        assert "broker_use_ssl" not in config
+
+
+class TestCeleryConfigOptionsOverride:
+    """Tests for celery_config_options being applied in create_celery_app()."""
+
+    def test_celery_config_options_applied_in_create_celery_app(self):
+        """Test that celery_config_options overrides are merged into 
create_celery_app() config."""
+        custom_config = {"worker_concurrency": 42, "broker_url": 
"redis://custom:6379/0"}
+
+        original_has_option = conf.has_option
+
+        def mock_has_option(section, key, **kwargs):
+            if section == "celery" and key == "celery_config_options":
+                return True
+            return original_has_option(section, key, **kwargs)
+
+        with (
+            mock.patch.object(conf, "has_option", side_effect=mock_has_option),
+            mock.patch.object(conf, "getimport", return_value=custom_config),
+        ):
+            celery_app = celery_executor_utils.create_celery_app(conf)
+            # The custom config should override defaults
+            assert celery_app.conf.worker_concurrency == 42
+            assert celery_app.conf.broker_url == "redis://custom:6379/0"
+

Review Comment:
   The new `celery_config_options` test only exercises the global `conf` case. 
Given `create_celery_app()` accepts `ExecutorConf` for multi-team mode, please 
add a regression test that passes a team-specific `ExecutorConf` (with a 
different `BROKER_URL`/team_name) and verifies the default 
`celery_config_options` does not overwrite the team-specific config, and that a 
non-default `celery_config_options` override is applied as expected for that 
team.
   ```suggestion
   
       def 
test_celery_config_options_team_executor_conf_does_not_override_broker_and_applies_override(
           self,
       ):
           """
           Test that when create_celery_app() is called with an ExecutorConf 
for a specific team:
           - The team-specific broker_url is not overwritten by default 
celery_config_options.
           - A non-default celery_config_options override (e.g. 
worker_concurrency) is applied for that team.
           """
           # Team-specific broker URL, distinct from any global default.
           team_broker_url = "redis://team-specific-broker:6379/0"
   
           # Use the real ExecutorConf type exposed by celery_executor_utils 
for multi-team mode.
           executor_conf = 
celery_executor_utils.ExecutorConf(team_name="test-team", conf=conf)
   
           # Custom celery_config_options override for this test: change 
worker_concurrency only.
           custom_config = {"worker_concurrency": 99}
   
           original_has_option = conf.has_option
   
           def mock_has_option(section, key, **kwargs):
               if section == "celery" and key == "celery_config_options":
                   return True
               return original_has_option(section, key, **kwargs)
   
           with (
               mock.patch.object(conf, "has_option", 
side_effect=mock_has_option),
               mock.patch.object(conf, "getimport", return_value=custom_config),
               # Ensure the team-specific broker URL is part of the executor 
configuration.
               mock.patch.object(
                   celery_executor_utils,
                   "DEFAULT_CELERY_CONFIG",
                   {**celery_executor_utils.DEFAULT_CELERY_CONFIG, 
"broker_url": team_broker_url},
               ),
           ):
               celery_app = 
celery_executor_utils.create_celery_app(executor_conf)
   
           # Default celery_config_options must not override the team-specific 
broker_url.
           assert celery_app.conf.broker_url == team_broker_url
           # But the non-default celery_config_options override must still be 
applied for this team.
           assert celery_app.conf.worker_concurrency == 99
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to