dandanseo123 commented on code in PR #64392:
URL: https://github.com/apache/airflow/pull/64392#discussion_r3029455459


##########
providers/celery/tests/unit/celery/executors/test_celery_executor.py:
##########
@@ -814,3 +814,101 @@ def test_execute_workload_ignores_already_running_task():
         """
         with pytest.raises(Ignore):
             execute_workload_unwrapped(workload_json)
+
+
+class TestAmqpsSslConfig:
+    """Tests for amqps:// broker URL SSL configuration (Fix for substring 
match bug)."""
+
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+            ("celery", "SSL_ACTIVE"): "True",
+            ("celery", "SSL_KEY"): "/path/to/key.pem",
+            ("celery", "SSL_CERT"): "/path/to/cert.pem",
+            ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+        }
+    )
+    def test_amqps_broker_url_builds_ssl_config(self):
+        """Test that amqps:// broker URLs correctly build broker_use_ssl with 
AMQP param names."""
+        import importlib
+        import ssl
+
+        importlib.reload(default_celery)
+
+        config = default_celery.DEFAULT_CELERY_CONFIG
+        assert "broker_use_ssl" in config, "broker_use_ssl should be set for 
amqps:// URLs"
+        broker_ssl = config["broker_use_ssl"]
+        assert broker_ssl["keyfile"] == "/path/to/key.pem"
+        assert broker_ssl["certfile"] == "/path/to/cert.pem"
+        assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
+        assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+        # Must NOT have ssl_ prefixed keys (those are for Redis)
+        assert "ssl_keyfile" not in broker_ssl
+        assert "ssl_certfile" not in broker_ssl
+
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "amqp://guest:guest@rabbitmq:5672//",
+            ("celery", "SSL_ACTIVE"): "True",
+            ("celery", "SSL_KEY"): "/path/to/key.pem",
+            ("celery", "SSL_CERT"): "/path/to/cert.pem",
+            ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+        }
+    )
+    def test_amqp_broker_url_still_builds_ssl_config(self):
+        """Test that amqp:// (non-TLS) broker URLs still build SSL config 
correctly (no regression)."""
+        import importlib
+        import ssl
+
+        importlib.reload(default_celery)
+
+        config = default_celery.DEFAULT_CELERY_CONFIG
+        assert "broker_use_ssl" in config
+        broker_ssl = config["broker_use_ssl"]
+        assert broker_ssl["keyfile"] == "/path/to/key.pem"
+        assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+
+    @conf_vars(
+        {
+            ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+            ("celery", "SSL_ACTIVE"): "False",
+        }
+    )
+    def test_amqps_broker_url_no_ssl_when_inactive(self):
+        """Test that amqps:// broker URLs don't get SSL config when SSL_ACTIVE 
is False."""
+        import importlib
+
+        importlib.reload(default_celery)
+
+        config = default_celery.DEFAULT_CELERY_CONFIG
+        assert "broker_use_ssl" not in config
+
+
+class TestCeleryConfigOptionsOverride:
+    """Tests for celery_config_options being applied in create_celery_app()."""
+
+    def test_celery_config_options_applied_in_create_celery_app(self):
+        """Test that celery_config_options overrides are merged into 
create_celery_app() config."""
+        custom_config = {"worker_concurrency": 42, "broker_url": 
"redis://custom:6379/0"}
+
+        original_has_option = conf.has_option
+
+        def mock_has_option(section, key, **kwargs):
+            if section == "celery" and key == "celery_config_options":
+                return True
+            return original_has_option(section, key, **kwargs)
+
+        with (
+            mock.patch.object(conf, "has_option", side_effect=mock_has_option),
+            mock.patch.object(conf, "getimport", return_value=custom_config),
+        ):
+            celery_app = celery_executor_utils.create_celery_app(conf)
+            # The custom config should override defaults
+            assert celery_app.conf.worker_concurrency == 42
+            assert celery_app.conf.broker_url == "redis://custom:6379/0"
+

Review Comment:
   addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to