This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0cf0d63a8cd Fix triggerer KEDA database connection rendering (#67538)
0cf0d63a8cd is described below
commit 0cf0d63a8cd6b234f5fb7bd34a7a11c63a30a770
Author: Henry Chen <[email protected]>
AuthorDate: Wed May 27 04:13:53 2026 +0800
Fix triggerer KEDA database connection rendering (#67538)
---
chart/templates/_helpers.yaml | 5 +-
.../secrets/metadata-connection-secret.yaml | 7 +-
.../helm_tests/airflow_core/test_triggerer.py | 100 ++++++++++++++++++++-
3 files changed, 108 insertions(+), 4 deletions(-)
diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml
index 3445f256a0f..709e9a22d1b 100644
--- a/chart/templates/_helpers.yaml
+++ b/chart/templates/_helpers.yaml
@@ -90,7 +90,10 @@ If release name contains chart name it will be used as a
full name.
{{- $kedaEnabled = or .Values.workers.celery.keda.enabled (and (not (has
.Values.workers.celery.keda.enabled (list true false)))
.Values.workers.keda.enabled) }}
{{- $kedaUsePgBouncer = or .Values.workers.celery.keda.usePgbouncer (and
(not (has .Values.workers.celery.keda.usePgbouncer (list true false)))
.Values.workers.keda.usePgbouncer) }}
{{- end }}
- {{- if and $kedaEnabled (or (eq .Values.data.metadataConnection.protocol
"mysql") (and .Values.pgbouncer.enabled (not $kedaUsePgBouncer))) }}
+ {{- $triggererKedaEnabled := and .Values.triggerer.enabled
.Values.triggerer.keda.enabled }}
+ {{- $workersKedaNeedsDbConn := and $kedaEnabled (or (eq
.Values.data.metadataConnection.protocol "mysql") (and
.Values.pgbouncer.enabled (not $kedaUsePgBouncer))) }}
+ {{- $triggererKedaNeedsDbConn := and $triggererKedaEnabled (or (eq
.Values.data.metadataConnection.protocol "mysql") (and
.Values.pgbouncer.enabled (not .Values.triggerer.keda.usePgbouncer))) }}
+ {{- if or $workersKedaNeedsDbConn $triggererKedaNeedsDbConn }}
- name: KEDA_DB_CONN
valueFrom:
secretKeyRef:
diff --git a/chart/templates/secrets/metadata-connection-secret.yaml
b/chart/templates/secrets/metadata-connection-secret.yaml
index 71441755d4b..93d94af9ead 100644
--- a/chart/templates/secrets/metadata-connection-secret.yaml
+++ b/chart/templates/secrets/metadata-connection-secret.yaml
@@ -37,6 +37,9 @@
{{- $kedaEnabled = or .Values.workers.celery.keda.enabled (and (not (has
.Values.workers.celery.keda.enabled (list true false)))
.Values.workers.keda.enabled) }}
{{- $kedaUsePgBouncer = or .Values.workers.celery.keda.usePgbouncer (and
(not (has .Values.workers.celery.keda.usePgbouncer (list true false)))
.Values.workers.keda.usePgbouncer) }}
{{- end }}
+{{- $triggererKedaEnabled := and .Values.triggerer.enabled
.Values.triggerer.keda.enabled }}
+{{- $workersKedaNeedsPgbouncerBypass := and $kedaEnabled
.Values.pgbouncer.enabled (not $kedaUsePgBouncer) }}
+{{- $triggererKedaNeedsPgbouncerBypass := and $triggererKedaEnabled
.Values.pgbouncer.enabled (not .Values.triggerer.keda.usePgbouncer) }}
apiVersion: v1
kind: Secret
metadata:
@@ -58,11 +61,11 @@ data:
{{- with .Values.data.metadataConnection }}
connection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf "%s:%s"
($metadataUser | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s" $host
$port) "path" (printf "/%s" $database) "query" $query) | b64enc | quote }}
{{- end }}
- {{- if and $kedaEnabled .Values.pgbouncer.enabled (not $kedaUsePgBouncer) }}
+ {{- if or $workersKedaNeedsPgbouncerBypass
$triggererKedaNeedsPgbouncerBypass }}
{{- with .Values.data.metadataConnection }}
kedaConnection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf
"%s:%s" ($metadataUser | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s"
$metadataHost $metadataPort) "path" (printf "/%s" $metadataDatabase) "query"
$query) | b64enc | quote }}
{{- end }}
- {{- else if and (or $kedaEnabled .Values.triggerer.keda.enabled) (eq
.Values.data.metadataConnection.protocol "mysql") }}
+ {{- else if and (or $kedaEnabled $triggererKedaEnabled) (eq
.Values.data.metadataConnection.protocol "mysql") }}
{{- with .Values.data.metadataConnection }}
kedaConnection: {{ urlJoin (dict "userinfo" (printf "%s:%s" ($metadataUser |
urlquery) (.pass | urlquery) ) "host" (printf "tcp(%s:%s)" $metadataHost
$metadataPort) "path" (printf "/%s" $metadataDatabase) "query" $query) |
trimPrefix "//" | b64enc | quote }}
{{- end }}
diff --git a/chart/tests/helm_tests/airflow_core/test_triggerer.py
b/chart/tests/helm_tests/airflow_core/test_triggerer.py
index 2a3356ab024..a1ecd6763f9 100644
--- a/chart/tests/helm_tests/airflow_core/test_triggerer.py
+++ b/chart/tests/helm_tests/airflow_core/test_triggerer.py
@@ -16,9 +16,11 @@
# under the License.
from __future__ import annotations
+import base64
+
import jmespath
import pytest
-from chart_utils.helm_template_generator import render_chart
+from chart_utils.helm_template_generator import prepare_k8s_lookup_dict,
render_chart
from chart_utils.log_groomer import LogGroomerTestBase
@@ -805,6 +807,18 @@ class TestTriggererLogGroomer(LogGroomerTestBase):
class TestTriggererKedaAutoScaler:
"""Tests triggerer keda autoscaler."""
+ @staticmethod
+ def _render_triggerer_keda_db_connection(values):
+ docs = render_chart(
+ values=values,
+ show_only=[
+ "templates/triggerer/triggerer-deployment.yaml",
+ "templates/triggerer/triggerer-kedaautoscaler.yaml",
+ "templates/secrets/metadata-connection-secret.yaml",
+ ],
+ )
+ return prepare_k8s_lookup_dict(docs)
+
def test_should_add_component_specific_labels(self):
docs = render_chart(
values={
@@ -907,3 +921,87 @@ class TestTriggererKedaAutoScaler:
assert
jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv", docs[0])
== "KEDA_DB_CONN"
assert jmespath.search("spec.triggers[0].metadata.connectionFromEnv",
docs[0]) is None
+
+ def test_mysql_db_backend_adds_keda_db_conn_to_triggerer(self):
+ docs_by_key = self._render_triggerer_keda_db_connection(
+ values={
+ "data": {"metadataConnection": {"protocol": "mysql", "port":
3306}},
+ "triggerer": {"keda": {"enabled": True}},
+ }
+ )
+
+ triggerer = docs_by_key[("StatefulSet", "release-name-triggerer")]
+ keda_autoscaler = docs_by_key[("ScaledObject",
"release-name-triggerer")]
+ metadata_secret = docs_by_key[("Secret", "release-name-metadata")]
+
+ triggerer_env_vars = jmespath.search(
+ "spec.template.spec.containers[?name=='triggerer'] |
[0].env[].name", triggerer
+ )
+ assert "KEDA_DB_CONN" in triggerer_env_vars
+
+ secret_data = jmespath.search("data", metadata_secret)
+ assert "kedaConnection" in secret_data
+ keda_connection_secret =
base64.b64decode(secret_data["kedaConnection"]).decode()
+ assert not keda_connection_secret.startswith("//")
+
+ assert (
+
jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv",
keda_autoscaler)
+ == "KEDA_DB_CONN"
+ )
+ assert jmespath.search("spec.triggers[0].metadata.connectionFromEnv",
keda_autoscaler) is None
+
+ def test_pgbouncer_bypass_adds_keda_db_conn_to_triggerer(self):
+ docs_by_key = self._render_triggerer_keda_db_connection(
+ values={
+ "pgbouncer": {"enabled": True},
+ "triggerer": {"keda": {"enabled": True, "usePgbouncer":
False}},
+ }
+ )
+
+ triggerer = docs_by_key[("StatefulSet", "release-name-triggerer")]
+ keda_autoscaler = docs_by_key[("ScaledObject",
"release-name-triggerer")]
+ metadata_secret = docs_by_key[("Secret", "release-name-metadata")]
+
+ triggerer_env_vars = jmespath.search(
+ "spec.template.spec.containers[?name=='triggerer'] |
[0].env[].name", triggerer
+ )
+ assert "KEDA_DB_CONN" in triggerer_env_vars
+
+ secret_data = jmespath.search("data", metadata_secret)
+ connection_secret =
base64.b64decode(secret_data["connection"]).decode()
+ keda_connection_secret =
base64.b64decode(secret_data["kedaConnection"]).decode()
+ assert "@release-name-pgbouncer" in connection_secret
+ assert ":6543" in connection_secret
+ assert "@release-name-postgresql" in keda_connection_secret
+ assert ":5432" in keda_connection_secret
+ assert "/postgres" in keda_connection_secret
+
+ assert (
+ jmespath.search("spec.triggers[0].metadata.connectionFromEnv",
keda_autoscaler) == "KEDA_DB_CONN"
+ )
+ assert
jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv",
keda_autoscaler) is None
+
+ def
test_postgresql_default_triggerer_keda_uses_airflow_db_connection(self):
+ docs_by_key = self._render_triggerer_keda_db_connection(
+ values={"triggerer": {"keda": {"enabled": True}}}
+ )
+
+ triggerer = docs_by_key[("StatefulSet", "release-name-triggerer")]
+ keda_autoscaler = docs_by_key[("ScaledObject",
"release-name-triggerer")]
+ metadata_secret = docs_by_key[("Secret", "release-name-metadata")]
+
+ triggerer_env_vars = jmespath.search(
+ "spec.template.spec.containers[?name=='triggerer'] |
[0].env[].name", triggerer
+ )
+ assert "AIRFLOW_CONN_AIRFLOW_DB" in triggerer_env_vars
+ assert "KEDA_DB_CONN" not in triggerer_env_vars
+
+ secret_data = jmespath.search("data", metadata_secret)
+ assert "connection" in secret_data
+ assert "kedaConnection" not in secret_data
+
+ assert (
+ jmespath.search("spec.triggers[0].metadata.connectionFromEnv",
keda_autoscaler)
+ == "AIRFLOW_CONN_AIRFLOW_DB"
+ )
+ assert
jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv",
keda_autoscaler) is None