This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b84fffbe664 Add workers.celery.keda section (#61820)
b84fffbe664 is described below
commit b84fffbe66452ca25404dd3ffd35cbac6f5ef2fe
Author: Przemysław Mirowski <[email protected]>
AuthorDate: Mon Mar 2 20:55:42 2026 +0100
Add workers.celery.keda section (#61820)
* Add workers.celery.keda section
* Misc & doc
* Add newsfragment
---
chart/docs/keda.rst | 4 +-
chart/newsfragments/61820.significant.rst | 1 +
chart/templates/NOTES.txt | 72 ++++++
chart/templates/_helpers.yaml | 8 +-
.../pgbouncer/pgbouncer-networkpolicy.yaml | 10 +-
.../secrets/metadata-connection-secret.yaml | 10 +-
.../triggerer/triggerer-kedaautoscaler.yaml | 2 +-
chart/values.schema.json | 116 +++++++++-
chart/values.yaml | 48 ++++
.../tests/helm_tests/airflow_core/test_worker.py | 117 +++++++---
.../helm_tests/airflow_core/test_worker_sets.py | 252 +++++++++++++++------
helm-tests/tests/helm_tests/other/test_keda.py | 234 ++++++++++++++++---
.../tests/helm_tests/other/test_pgbouncer.py | 166 +++++++-------
13 files changed, 805 insertions(+), 235 deletions(-)
diff --git a/chart/docs/keda.rst b/chart/docs/keda.rst
index 357eae008c6..34a68294c4d 100644
--- a/chart/docs/keda.rst
+++ b/chart/docs/keda.rst
@@ -41,7 +41,7 @@ One advantage of KEDA is it allows you to scale your
application to/from 0 worke
--namespace keda \
--version "v2.0.0"
-Enable for the Airflow instance by setting ``workers.keda.enabled=true`` in
your
+Enable for the Airflow instance by setting
``workers.celery.keda.enabled=true`` in your
helm command or in the ``values.yaml``.
Make sure ``values.yaml`` shows that either KEDA or HPA is enabled, but not
both.
@@ -56,7 +56,7 @@ They will compete with each other resulting in odd scaling
behavior.
helm install airflow apache-airflow/airflow \
--namespace airflow \
--set executor=CeleryExecutor \
- --set workers.keda.enabled=true
+ --set workers.celery.keda.enabled=true
A ``ScaledObject`` and an ``hpa`` will be created in the Airflow namespace.
diff --git a/chart/newsfragments/61820.significant.rst
b/chart/newsfragments/61820.significant.rst
new file mode 100644
index 00000000000..b6e72e39a2f
--- /dev/null
+++ b/chart/newsfragments/61820.significant.rst
@@ -0,0 +1 @@
+``workers.keda`` section is now deprecated in favor of
``workers.celery.keda``. Please update your configuration accordingly.
diff --git a/chart/templates/NOTES.txt b/chart/templates/NOTES.txt
index 8e0f30a5f24..2c4ad458323 100644
--- a/chart/templates/NOTES.txt
+++ b/chart/templates/NOTES.txt
@@ -399,6 +399,78 @@ DEPRECATION WARNING:
{{- end }}
+{{- if .Values.workers.keda.enabled }}
+
+ DEPRECATION WARNING:
+ `workers.keda.enabled` has been renamed to `workers.celery.keda.enabled`.
+ Please change your values as support for the old name will be dropped in a
future release.
+
+{{- end }}
+
+{{- if not (empty .Values.workers.keda.namespaceLabels) }}
+
+ DEPRECATION WARNING:
+ `workers.keda.namespaceLabels` has been renamed to
`workers.celery.keda.namespaceLabels`.
+ Please change your values as support for the old name will be dropped in a
future release.
+
+{{- end }}
+
+{{- if ne (int .Values.workers.keda.pollingInterval) 5 }}
+
+ DEPRECATION WARNING:
+ `workers.keda.pollingInterval` has been renamed to
`workers.celery.keda.pollingInterval`.
+ Please change your values as support for the old name will be dropped in a
future release.
+
+{{- end }}
+
+{{- if ne (int .Values.workers.keda.cooldownPeriod) 30 }}
+
+ DEPRECATION WARNING:
+ `workers.keda.cooldownPeriod` has been renamed to
`workers.celery.keda.cooldownPeriod`.
+ Please change your values as support for the old name will be dropped in a
future release.
+
+{{- end }}
+
+{{- if ne (int .Values.workers.keda.minReplicaCount) 0 }}
+
+ DEPRECATION WARNING:
+ `workers.keda.minReplicaCount` has been renamed to
`workers.celery.keda.minReplicaCount`.
+ Please change your values as support for the old name will be dropped in a
future release.
+
+{{- end }}
+
+{{- if ne (int .Values.workers.keda.maxReplicaCount) 10 }}
+
+ DEPRECATION WARNING:
+ `workers.keda.maxReplicaCount` has been renamed to
`workers.celery.keda.maxReplicaCount`.
+ Please change your values as support for the old name will be dropped in a
future release.
+
+{{- end }}
+
+{{- if not (empty .Values.workers.keda.advanced) }}
+
+ DEPRECATION WARNING:
+ `workers.keda.advanced` has been renamed to `workers.celery.keda.advanced`.
+ Please change your values as support for the old name will be dropped in a
future release.
+
+{{- end }}
+
+{{- if ne (quote .Values.workers.keda.query) (quote "SELECT
ceil(COUNT(*)::decimal / {{ .Values.config.celery.worker_concurrency }}) FROM
task_instance WHERE (state='running' OR state='queued') AND queue IN ( {{-
range $i, $q := splitList \",\" .Values.workers.queue -}} {{- if $i }},{{ end
}}'{{ $q | trim }}' {{- end -}} ) {{- if contains \"CeleryKubernetesExecutor\"
.Values.executor }} AND queue != '{{
.Values.config.celery_kubernetes_executor.kubernetes_queue }}' {{- else if
contains \ [...]
+
+ DEPRECATION WARNING:
+ `workers.keda.query` has been renamed to `workers.celery.keda.query`.
+ Please change your values as support for the old name will be dropped in a
future release.
+
+{{- end }}
+
+{{- if not .Values.workers.keda.usePgbouncer }}
+
+ DEPRECATION WARNING:
+ `workers.keda.usePgbouncer` has been renamed to
`workers.celery.keda.usePgbouncer`.
+ Please change your values as support for the old name will be dropped in a
future release.
+
+{{- end }}
+
{{- if not (empty .Values.webserver.defaultUser) }}
DEPRECATION WARNING:
diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml
index e7c5edda247..ceb0041a7b7 100644
--- a/chart/templates/_helpers.yaml
+++ b/chart/templates/_helpers.yaml
@@ -76,7 +76,13 @@ If release name contains chart name it will be used as a
full name.
name: {{ template "airflow_metadata_secret" . }}
key: connection
{{- end }}
- {{- if and .Values.workers.keda.enabled (or (eq
.Values.data.metadataConnection.protocol "mysql") (and
.Values.pgbouncer.enabled (not .Values.workers.keda.usePgbouncer))) }}
+ {{- $kedaEnabled := .Values.workers.keda.enabled }}
+ {{- $kedaUsePgBouncer := .Values.workers.keda.usePgbouncer }}
+ {{- if hasKey .Values.workers "celery" }}
+ {{- $kedaEnabled = or .Values.workers.celery.keda.enabled (and (not (has
.Values.workers.celery.keda.enabled (list true false)))
.Values.workers.keda.enabled) }}
+ {{- $kedaUsePgBouncer = or .Values.workers.celery.keda.usePgbouncer (and
(not (has .Values.workers.celery.keda.usePgbouncer (list true false)))
.Values.workers.keda.usePgbouncer) }}
+ {{- end }}
+ {{- if and $kedaEnabled (or (eq .Values.data.metadataConnection.protocol
"mysql") (and .Values.pgbouncer.enabled (not $kedaUsePgBouncer))) }}
- name: KEDA_DB_CONN
valueFrom:
secretKeyRef:
diff --git a/chart/templates/pgbouncer/pgbouncer-networkpolicy.yaml
b/chart/templates/pgbouncer/pgbouncer-networkpolicy.yaml
index 7694384fb20..f80f3e1264d 100644
--- a/chart/templates/pgbouncer/pgbouncer-networkpolicy.yaml
+++ b/chart/templates/pgbouncer/pgbouncer-networkpolicy.yaml
@@ -20,7 +20,11 @@
################################
## Pgbouncer NetworkPolicy
#################################
-{{- $workersKedaEnabled := and .Values.workers.keda.enabled (has
.Values.executor (list "CeleryExecutor" "CeleryKubernetesExecutor")) }}
+{{- $kedaEnabled := .Values.workers.keda.enabled }}
+{{- if hasKey .Values.workers "celery" }}
+ {{- $kedaEnabled = or .Values.workers.celery.keda.enabled (and (not (has
.Values.workers.celery.keda.enabled (list true false)))
.Values.workers.keda.enabled) }}
+{{- end }}
+{{- $workersKedaEnabled := and $kedaEnabled (has .Values.executor (list
"CeleryExecutor" "CeleryKubernetesExecutor")) }}
{{- $triggererEnabled := .Values.triggerer.enabled }}
{{- $triggererKedaEnabled := and $triggererEnabled
.Values.triggerer.keda.enabled }}
{{- if and .Values.pgbouncer.enabled .Values.networkPolicies.enabled }}
@@ -52,9 +56,9 @@ spec:
tier: airflow
release: {{ .Release.Name }}
{{- if or $workersKedaEnabled $triggererKedaEnabled }}
- {{- if and $workersKedaEnabled .Values.workers.keda.namespaceLabels }}
+ {{- if and $workersKedaEnabled
(.Values.workers.celery.keda.namespaceLabels | default
.Values.workers.keda.namespaceLabels) }}
- namespaceSelector:
- matchLabels: {{- toYaml .Values.workers.keda.namespaceLabels | nindent
10 }}
+ matchLabels: {{- toYaml (.Values.workers.celery.keda.namespaceLabels |
default .Values.workers.keda.namespaceLabels) | nindent 10 }}
podSelector:
{{- else if and $triggererEnabled .Values.triggerer.keda.namespaceLabels }}
- namespaceSelector:
diff --git a/chart/templates/secrets/metadata-connection-secret.yaml
b/chart/templates/secrets/metadata-connection-secret.yaml
index 2e53dd308cf..04e3fe7bc40 100644
--- a/chart/templates/secrets/metadata-connection-secret.yaml
+++ b/chart/templates/secrets/metadata-connection-secret.yaml
@@ -30,6 +30,12 @@
{{- $metadataDatabase := .Values.data.metadataConnection.db }}
{{- $database := (ternary (printf "%s-%s" .Release.Name "metadata")
$metadataDatabase .Values.pgbouncer.enabled) }}
{{- $query := ternary (printf "sslmode=%s"
.Values.data.metadataConnection.sslmode) "" (eq
.Values.data.metadataConnection.protocol "postgresql") }}
+{{- $kedaEnabled := .Values.workers.keda.enabled }}
+{{- $kedaUsePgBouncer := .Values.workers.keda.usePgbouncer }}
+{{- if hasKey .Values.workers "celery" }}
+ {{- $kedaEnabled = or .Values.workers.celery.keda.enabled (and (not (has
.Values.workers.celery.keda.enabled (list true false)))
.Values.workers.keda.enabled) }}
+ {{- $kedaUsePgBouncer = or .Values.workers.celery.keda.usePgbouncer (and
(not (has .Values.workers.celery.keda.usePgbouncer (list true false)))
.Values.workers.keda.usePgbouncer) }}
+{{- end }}
apiVersion: v1
kind: Secret
metadata:
@@ -51,11 +57,11 @@ data:
{{- with .Values.data.metadataConnection }}
connection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf "%s:%s"
(.user | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s" $host $port)
"path" (printf "/%s" $database) "query" $query) | b64enc | quote }}
{{- end }}
- {{- if and .Values.workers.keda.enabled .Values.pgbouncer.enabled (not
.Values.workers.keda.usePgbouncer) }}
+ {{- if and $kedaEnabled .Values.pgbouncer.enabled (not $kedaUsePgBouncer) }}
{{- with .Values.data.metadataConnection }}
kedaConnection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf
"%s:%s" (.user | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s"
$metadataHost $metadataPort) "path" (printf "/%s" $metadataDatabase) "query"
$query) | b64enc | quote }}
{{- end }}
- {{- else if and (or .Values.workers.keda.enabled
.Values.triggerer.keda.enabled) (eq .Values.data.metadataConnection.protocol
"mysql") }}
+ {{- else if and (or $kedaEnabled .Values.triggerer.keda.enabled) (eq
.Values.data.metadataConnection.protocol "mysql") }}
{{- with .Values.data.metadataConnection }}
kedaConnection: {{ urlJoin (dict "userinfo" (printf "%s:%s" (.user |
urlquery) (.pass | urlquery) ) "host" (printf "tcp(%s:%s)" $metadataHost
$metadataPort) "path" (printf "/%s" $metadataDatabase) "query" $query) |
trimPrefix "//" | b64enc | quote }}
{{- end }}
diff --git a/chart/templates/triggerer/triggerer-kedaautoscaler.yaml
b/chart/templates/triggerer/triggerer-kedaautoscaler.yaml
index da9a48cf06a..fa54aa37367 100644
--- a/chart/templates/triggerer/triggerer-kedaautoscaler.yaml
+++ b/chart/templates/triggerer/triggerer-kedaautoscaler.yaml
@@ -40,7 +40,7 @@ spec:
kind: {{ ternary "StatefulSet" "Deployment"
.Values.triggerer.persistence.enabled }}
name: {{ include "airflow.fullname" . }}-triggerer
envSourceContainerName: triggerer
- pollingInterval: {{ .Values.triggerer.keda.pollingInterval }}
+ pollingInterval: {{ .Values.triggerer.keda.pollingInterval }}
cooldownPeriod: {{ .Values.triggerer.keda.cooldownPeriod }}
minReplicaCount: {{ .Values.triggerer.keda.minReplicaCount }}
maxReplicaCount: {{ .Values.triggerer.keda.maxReplicaCount }}
diff --git a/chart/values.schema.json b/chart/values.schema.json
index 1e4de6ac25a..caac8a828f4 100644
--- a/chart/values.schema.json
+++ b/chart/values.schema.json
@@ -1879,17 +1879,17 @@
}
},
"keda": {
- "description": "KEDA configuration of Airflow Celery
workers.",
+ "description": "KEDA configuration of Airflow Celery
workers (deprecated, use `workers.celery.keda` instead).",
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": {
- "description": "Allow KEDA autoscaling.",
+ "description": "Allow KEDA autoscaling
(deprecated, use `workers.celery.keda.enabled` instead).",
"type": "boolean",
"default": false
},
"namespaceLabels": {
- "description": "Labels used in `matchLabels` for
namespace in the PgBouncer NetworkPolicy.",
+ "description": "Labels used in `matchLabels` for
namespace in the PgBouncer NetworkPolicy (deprecated, use
`workers.celery.keda.namespaceLabels` instead).",
"type": "object",
"default": {},
"additionalProperties": {
@@ -1897,38 +1897,38 @@
}
},
"pollingInterval": {
- "description": "How often KEDA polls the airflow
DB to report new scale requests to the HPA.",
+ "description": "How often KEDA polls the airflow
DB to report new scale requests to the HPA (deprecated, use
`workers.celery.keda.pollingInterval` instead).",
"type": "integer",
"default": 5
},
"cooldownPeriod": {
- "description": "How many seconds KEDA will wait
before scaling to zero.",
+ "description": "How many seconds KEDA will wait
before scaling to zero (deprecated, use `workers.celery.keda.cooldownPeriod`
instead).",
"type": "integer",
"default": 30
},
"minReplicaCount": {
- "description": "Minimum number of Airflow Celery
workers created by KEDA.",
+ "description": "Minimum number of Airflow Celery
workers created by KEDA (deprecated, use `workers.celery.keda.minReplicaCount`
instead).",
"type": "integer",
"default": 0
},
"maxReplicaCount": {
- "description": "Maximum number of Airflow Celery
workers created by KEDA.",
+ "description": "Maximum number of Airflow Celery
workers created by KEDA (deprecated, use `workers.celery.keda.maxReplicaCount`
instead).",
"type": "integer",
"default": 10
},
"advanced": {
- "description": "Advanced KEDA configuration.",
+ "description": "Advanced KEDA configuration
(deprecated, use `workers.celery.keda.advanced` instead).",
"type": "object",
"default": {},
"additionalProperties": false,
"properties": {
"horizontalPodAutoscalerConfig": {
- "description":
"HorizontalPodAutoscalerConfig specifies horizontal scale config.",
+ "description":
"HorizontalPodAutoscalerConfig specifies horizontal scale config (deprecated,
use `workers.celery.keda.advanced.horizontalPodAutoscalerConfig` instead).",
"type": "object",
"default": {},
"properties": {
"behavior": {
- "description":
"HorizontalPodAutoscalerBehavior configures the scaling behavior of the
target.",
+ "description":
"HorizontalPodAutoscalerBehavior configures the scaling behavior of the target
(deprecated, use
`workers.celery.keda.advanced.horizontalPodAutoscalerConfig.behavior`
instead).",
"type": "object",
"default": {},
"$ref":
"#/definitions/io.k8s.api.autoscaling.v2.HorizontalPodAutoscalerBehavior"
@@ -1938,12 +1938,12 @@
}
},
"query": {
- "description": "Query to use for KEDA autoscaling.
Must return a single integer.",
+ "description": "Query to use for KEDA autoscaling
(deprecated, use `workers.celery.keda.query` instead). Must return a single
integer.",
"type": "string",
"default": "SELECT ceil(COUNT(*)::decimal / {{
.Values.config.celery.worker_concurrency }}) FROM task_instance WHERE
(state='running' OR state='queued') AND queue IN ( {{- range $i, $q :=
splitList \",\" .Values.workers.queue -}} {{- if $i }},{{ end }}'{{ $q | trim
}}' {{- end -}} ) {{- if contains \"CeleryKubernetesExecutor\" .Values.executor
}} AND queue != '{{ .Values.config.celery_kubernetes_executor.kubernetes_queue
}}' {{- else if contains \"KubernetesEx [...]
},
"usePgbouncer": {
- "description": "Weather to use PGBouncer to
connect to the database or not when it is enabled. This configuration will be
ignored if PGBouncer is not enabled.",
+ "description": "Weather to use PGBouncer to
connect to the database or not when it is enabled (deprecated, use
`workers.celery.keda.usePgbouncer` instead). This configuration will be ignored
if PGBouncer is not enabled.",
"type": "boolean",
"default": true
}
@@ -2919,6 +2919,98 @@
}
}
},
+ "keda": {
+ "description": "KEDA configuration of Airflow
Celery workers.",
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "enabled": {
+ "description": "Allow KEDA autoscaling.",
+ "type": [
+ "boolean",
+ "null"
+ ],
+ "default": null
+ },
+ "namespaceLabels": {
+ "description": "Labels used in
`matchLabels` for namespace in the PgBouncer NetworkPolicy.",
+ "type": "object",
+ "default": {},
+ "additionalProperties": {
+ "type": "string"
+ }
+ },
+ "pollingInterval": {
+ "description": "How often KEDA polls the
airflow DB to report new scale requests to the HPA.",
+ "type": [
+ "integer",
+ "null"
+ ],
+ "default": null
+ },
+ "cooldownPeriod": {
+ "description": "How many seconds KEDA will
wait before scaling to zero.",
+ "type": [
+ "integer",
+ "null"
+ ],
+ "default": null
+ },
+ "minReplicaCount": {
+ "description": "Minimum number of Airflow
Celery workers created by KEDA.",
+ "type": [
+ "integer",
+ "null"
+ ],
+ "default": null
+ },
+ "maxReplicaCount": {
+ "description": "Maximum number of Airflow
Celery workers created by KEDA.",
+ "type": [
+ "integer",
+ "null"
+ ],
+ "default": null
+ },
+ "advanced": {
+ "description": "Advanced KEDA
configuration.",
+ "type": "object",
+ "default": {},
+ "additionalProperties": false,
+ "properties": {
+ "horizontalPodAutoscalerConfig": {
+ "description":
"HorizontalPodAutoscalerConfig specifies horizontal scale config.",
+ "type": "object",
+ "default": {},
+ "properties": {
+ "behavior": {
+ "description":
"HorizontalPodAutoscalerBehavior configures the scaling behavior of the
target.",
+ "type": "object",
+ "default": {},
+ "$ref":
"#/definitions/io.k8s.api.autoscaling.v2.HorizontalPodAutoscalerBehavior"
+ }
+ }
+ }
+ }
+ },
+ "query": {
+ "description": "Query to use for KEDA
autoscaling. Must return a single integer.",
+ "type": [
+ "string",
+ "null"
+ ],
+ "default": null
+ },
+ "usePgbouncer": {
+ "description": "Weather to use PGBouncer
to connect to the database or not when it is enabled. This configuration will
be ignored if PGBouncer is not enabled.",
+ "type": [
+ "boolean",
+ "null"
+ ],
+ "default": null
+ }
+ }
+ },
"persistence": {
"description": "Persistence configuration for
Airflow Celery workers.",
"type": "object",
diff --git a/chart/values.yaml b/chart/values.yaml
index 11c9d96c5af..17b1e14f513 100644
--- a/chart/values.yaml
+++ b/chart/values.yaml
@@ -733,24 +733,33 @@ workers:
annotations: {}
# Allow KEDA autoscaling for Airflow Celery workers
+ # (deprecated, use `workers.celery.keda` instead)
keda:
+ # (deprecated, use `workers.celery.keda.enabled` instead)
enabled: false
+
+ # (deprecated, use `workers.celery.keda.namespaceLabels` instead)
namespaceLabels: {}
# How often KEDA polls the airflow DB to report new scale requests to the
HPA
+ # (deprecated, use `workers.celery.keda.pollingInterval` instead)
pollingInterval: 5
# How many seconds KEDA will wait before scaling to zero.
# Note that HPA has a separate cooldown period for scale-downs
+ # (deprecated, use `workers.celery.keda.cooldownPeriod` instead)
cooldownPeriod: 30
# Minimum number of Airflow Celery workers created by keda
+ # (deprecated, use `workers.celery.keda.minReplicaCount` instead)
minReplicaCount: 0
# Maximum number of Airflow Celery workers created by keda
+ # (deprecated, use `workers.celery.keda.maxReplicaCount` instead)
maxReplicaCount: 10
# Specify HPA related options
+ # (deprecated, use `workers.celery.keda.advanced` instead)
advanced: {}
# horizontalPodAutoscalerConfig:
# behavior:
@@ -762,6 +771,7 @@ workers:
# periodSeconds: 15
# Query to use for KEDA autoscaling. Must return a single integer.
+ # (deprecated, use `workers.celery.keda.query` instead)
query: >-
SELECT ceil(COUNT(*)::decimal / {{
.Values.config.celery.worker_concurrency }})
FROM task_instance
@@ -781,6 +791,7 @@ workers:
# Weather to use PGBouncer to connect to the database or not when it is
enabled
# This configuration will be ignored if PGBouncer is not enabled
+ # (deprecated, use `workers.celery.keda.usePgbouncer` instead)
usePgbouncer: true
# Allow HPA for Airflow Celery workers (KEDA must be disabled)
@@ -1125,6 +1136,43 @@ workers:
maxUnavailable: ~
# minAvailable: ~
+ # Allow KEDA autoscaling for Airflow Celery workers
+ keda:
+ enabled: ~
+
+ namespaceLabels: {}
+
+ # How often KEDA polls the airflow DB to report new scale requests to
the HPA
+ pollingInterval: ~
+
+ # How many seconds KEDA will wait before scaling to zero.
+ # Note that HPA has a separate cooldown period for scale-downs
+ cooldownPeriod: ~
+
+ # Minimum number of Airflow Celery workers created by keda
+ minReplicaCount: ~
+
+ # Maximum number of Airflow Celery workers created by keda
+ maxReplicaCount: ~
+
+ # Specify HPA related options
+ advanced: {}
+ # horizontalPodAutoscalerConfig:
+ # behavior:
+ # scaleDown:
+ # stabilizationWindowSeconds: 300
+ # policies:
+ # - type: Percent
+ # value: 100
+ # periodSeconds: 15
+
+ # Query to use for KEDA autoscaling. Must return a single integer
+ query: ~
+
+ # Weather to use PGBouncer to connect to the database or not when it is
enabled
+ # This configuration will be ignored if PGBouncer is not enabled
+ usePgbouncer: ~
+
# Persistence volume configuration for Airflow Celery workers
persistence:
# Enable persistent volumes
diff --git a/helm-tests/tests/helm_tests/airflow_core/test_worker.py
b/helm-tests/tests/helm_tests/airflow_core/test_worker.py
index 07c93936722..0d29a8429ab 100644
--- a/helm-tests/tests/helm_tests/airflow_core/test_worker.py
+++ b/helm-tests/tests/helm_tests/airflow_core/test_worker.py
@@ -1739,7 +1739,7 @@ class TestWorkerKedaAutoScaler:
values={
"executor": "CeleryExecutor",
"workers": {
- "keda": {"enabled": True},
+ "celery": {"keda": {"enabled": True}},
"labels": {"test_label": "test_label_value"},
},
},
@@ -1754,7 +1754,7 @@ class TestWorkerKedaAutoScaler:
values={
"executor": "CeleryExecutor",
"workers": {
- "keda": {"enabled": True},
+ "celery": {"keda": {"enabled": True}},
},
},
show_only=["templates/workers/worker-deployment.yaml"],
@@ -1763,56 +1763,110 @@ class TestWorkerKedaAutoScaler:
assert "replicas" not in jmespath.search("spec", docs[0])
@pytest.mark.parametrize(
- ("query", "executor", "expected_query"),
+ ("executor", "expected_query"),
[
- # default query with CeleryExecutor
(
- None,
"CeleryExecutor",
- "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance"
- " WHERE (state='running' OR state='queued') AND queue IN
('default')",
+ "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance WHERE
(state='running' OR state='queued') AND queue IN ('default')",
),
- # default query with CeleryKubernetesExecutor
(
- None,
"CeleryKubernetesExecutor",
- "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance"
- " WHERE (state='running' OR state='queued') AND queue IN
('default') AND queue != 'kubernetes'",
+ "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance WHERE
(state='running' OR state='queued') AND queue IN ('default') AND queue !=
'kubernetes'",
+ ),
+ ],
+ )
+ def test_keda_query_default(self, executor, expected_query):
+ docs = render_chart(
+ values={
+ "executor": executor,
+ "workers": {
+ "celery": {"keda": {"enabled": True}},
+ },
+ },
+ show_only=["templates/workers/worker-kedaautoscaler.yaml"],
+ )
+ assert expected_query ==
jmespath.search("spec.triggers[0].metadata.query", docs[0])
+
+ @pytest.mark.parametrize(
+ ("workers_values", "executor"),
+ [
+ # workers - test custom static query
+ (
+ {
+ "keda": {
+ "enabled": True,
+ "query": "SELECT ceil(COUNT(*)::decimal / 32) FROM
task_instance",
+ }
+ },
+ "CeleryKubernetesExecutor",
+ ),
+ # workers - test custom template query
+ (
+ {
+ "keda": {
+ "enabled": True,
+ "query": "SELECT ceil(COUNT(*)::decimal / {{ mul
.Values.config.celery.worker_concurrency 2 }}) FROM task_instance",
+ }
+ },
+ "CeleryKubernetesExecutor",
),
- # test custom static query
+ # workers.celery - test custom static query
(
- "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance",
+ {
+ "celery": {
+ "keda": {
+ "enabled": True,
+ "query": "SELECT ceil(COUNT(*)::decimal / 32) FROM
task_instance",
+ }
+ }
+ },
"CeleryKubernetesExecutor",
- "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance",
),
- # test custom template query
+ # workers.celery - test custom template query
(
- "SELECT ceil(COUNT(*)::decimal / {{ mul
.Values.config.celery.worker_concurrency 2 }})"
- " FROM task_instance",
+ {
+ "celery": {
+ "keda": {
+ "enabled": True,
+ "query": "SELECT ceil(COUNT(*)::decimal / {{ mul
.Values.config.celery.worker_concurrency 2 }}) FROM task_instance",
+ }
+ }
+ },
+ "CeleryKubernetesExecutor",
+ ),
+ # workers - workers.celery overwrite
+ (
+ {
+ "keda": {"query": "SELECT ceil(COUNT(*)::decimal / 16)
FROM task_instance"},
+ "celery": {
+ "keda": {
+ "enabled": True,
+ "query": "SELECT ceil(COUNT(*)::decimal / 32) FROM
task_instance",
+ }
+ },
+ },
"CeleryKubernetesExecutor",
- "SELECT ceil(COUNT(*)::decimal / 32) FROM task_instance",
),
],
)
- def test_should_use_keda_query(self, query, executor, expected_query):
+ def test_keda_query_overwrite(self, workers_values, executor):
docs = render_chart(
values={
"executor": executor,
- "workers": {
- "keda": {"enabled": True, **({"query": query} if query
else {})},
- },
+ "workers": workers_values,
},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
- assert expected_query ==
jmespath.search("spec.triggers[0].metadata.query", docs[0])
+ assert (
+ jmespath.search("spec.triggers[0].metadata.query", docs[0])
+ == "SELECT ceil(COUNT(*)::decimal / 32) FROM task_instance"
+ )
def test_mysql_db_backend_keda_worker(self):
docs = render_chart(
values={
"data": {"metadataConnection": {"protocol": "mysql"}},
- "workers": {
- "keda": {"enabled": True},
- },
+ "workers": {"celery": {"keda": {"enabled": True}}},
},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
@@ -1826,12 +1880,19 @@ class TestWorkerKedaAutoScaler:
class TestWorkerHPAAutoScaler:
"""Tests worker HPA auto scaler."""
- def test_should_be_disabled_on_keda_enabled(self):
+ @pytest.mark.parametrize(
+ "workers_keda_values",
+ [
+ {"keda": {"enabled": True}},
+ {"celery": {"keda": {"enabled": True}}},
+ ],
+ )
+ def test_should_be_disabled_on_keda_enabled(self, workers_keda_values):
docs = render_chart(
values={
"executor": "CeleryExecutor",
"workers": {
- "keda": {"enabled": True},
+ **workers_keda_values,
"hpa": {"enabled": True},
"labels": {"test_label": "test_label_value"},
},
diff --git a/helm-tests/tests/helm_tests/airflow_core/test_worker_sets.py
b/helm-tests/tests/helm_tests/airflow_core/test_worker_sets.py
index e242139c611..b91abf3c1bb 100644
--- a/helm-tests/tests/helm_tests/airflow_core/test_worker_sets.py
+++ b/helm-tests/tests/helm_tests/airflow_core/test_worker_sets.py
@@ -1295,8 +1295,8 @@ class TestWorkerSets:
name="test",
values={
"workers": {
- "keda": {"enabled": True},
"celery": {
+ "keda": {"enabled": True},
"enableDefault": enable_default,
"sets": [
{"name": "set1"},
@@ -1322,77 +1322,133 @@ class TestWorkerSets:
assert len(docs) == 1
- def test_overwrite_keda_disable(self):
- docs = render_chart(
- values={
- "workers": {
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {
+ "keda": {"enabled": True},
+ "celery": {"enableDefault": False, "sets": [{"name": "test",
"keda": {"enabled": False}}]},
+ },
+ {
+ "celery": {
"keda": {"enabled": True},
- "celery": {
- "enableDefault": False,
- "sets": [{"name": "test", "keda": {"enabled": False}}],
- },
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": False}}],
}
},
+ ],
+ )
+ def test_overwrite_keda_disable(self, workers_values):
+ docs = render_chart(
+ values={"workers": workers_values},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
assert len(docs) == 0
- def test_overwrite_keda_pooling_interval(self):
- docs = render_chart(
- values={
- "workers": {
- "celery": {
- "enableDefault": False,
- "sets": [{"name": "test", "keda": {"enabled": True,
"pollingInterval": 10}}],
- },
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {
+ "keda": {"pollingInterval": 1},
+ "celery": {
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"pollingInterval": 10}}],
+ },
+ },
+ {
+ "celery": {
+ "keda": {"pollingInterval": 1},
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"pollingInterval": 10}}],
}
},
+ ],
+ )
+ def test_overwrite_keda_pooling_interval(self, workers_values):
+ docs = render_chart(
+ values={"workers": workers_values},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
assert jmespath.search("spec.pollingInterval", docs[0]) == 10
- def test_overwrite_keda_cooldown_period(self):
- docs = render_chart(
- values={
- "workers": {
- "celery": {
- "enableDefault": False,
- "sets": [{"name": "test", "keda": {"enabled": True,
"cooldownPeriod": 10}}],
- },
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {
+ "keda": {"cooldownPeriod": 1},
+ "celery": {
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"cooldownPeriod": 10}}],
+ },
+ },
+ {
+ "celery": {
+ "keda": {"cooldownPeriod": 1},
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"cooldownPeriod": 10}}],
}
},
+ ],
+ )
+ def test_overwrite_keda_cooldown_period(self, workers_values):
+ docs = render_chart(
+ values={"workers": workers_values},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
assert jmespath.search("spec.cooldownPeriod", docs[0]) == 10
- def test_overwrite_keda_min_replica_count(self):
- docs = render_chart(
- values={
- "workers": {
- "celery": {
- "enableDefault": False,
- "sets": [{"name": "test", "keda": {"enabled": True,
"minReplicaCount": 10}}],
- },
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {
+ "keda": {"minReplicaCount": 1},
+ "celery": {
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"minReplicaCount": 10}}],
+ },
+ },
+ {
+ "celery": {
+ "keda": {"minReplicaCount": 1},
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"minReplicaCount": 10}}],
}
},
+ ],
+ )
+ def test_overwrite_keda_min_replica_count(self, workers_values):
+ docs = render_chart(
+ values={"workers": workers_values},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
assert jmespath.search("spec.minReplicaCount", docs[0]) == 10
- def test_overwrite_keda_max_replica_count(self):
- docs = render_chart(
- values={
- "workers": {
- "celery": {
- "enableDefault": False,
- "sets": [{"name": "test", "keda": {"enabled": True,
"maxReplicaCount": 5}}],
- },
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {
+ "keda": {"maxReplicaCount": 1},
+ "celery": {
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"maxReplicaCount": 5}}],
+ },
+ },
+ {
+ "celery": {
+ "keda": {"maxReplicaCount": 1},
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"maxReplicaCount": 5}}],
}
},
+ ],
+ )
+ def test_overwrite_keda_max_replica_count(self, workers_values):
+ docs = render_chart(
+ values={"workers": workers_values},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
@@ -1448,6 +1504,35 @@ class TestWorkerSets:
],
},
},
+ {
+ "celery": {
+ "keda": {
+ "advanced": {
+ "horizontalPodAutoscalerConfig": {
+ "behavior": {
+ "scaleDown": {
+ "policies": [{"type": "Percent",
"value": 100, "periodSeconds": 15}]
+ }
+ }
+ }
+ }
+ },
+ "enableDefault": False,
+ "sets": [
+ {
+ "name": "test",
+ "keda": {
+ "enabled": True,
+ "advanced": {
+ "horizontalPodAutoscalerConfig": {
+ "behavior": {"scaleDown":
{"stabilizationWindowSeconds": 300}}
+ }
+ },
+ },
+ }
+ ],
+ },
+ },
],
)
def test_overwrite_keda_advanced(self, workers_values):
@@ -1460,32 +1545,57 @@ class TestWorkerSets:
"horizontalPodAutoscalerConfig": {"behavior": {"scaleDown":
{"stabilizationWindowSeconds": 300}}}
}
- def test_overwrite_keda_query(self):
- docs = render_chart(
- values={
- "workers": {
- "celery": {
- "enableDefault": False,
- "sets": [{"name": "test", "keda": {"enabled": True,
"query": "test"}}],
- },
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {
+ "keda": {"query": "not"},
+ "celery": {
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"query": "test"}}],
+ },
+ },
+ {
+ "celery": {
+ "keda": {"query": "not"},
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"query": "test"}}],
}
},
+ ],
+ )
+ def test_overwrite_keda_query(self, workers_values):
+ docs = render_chart(
+ values={"workers": workers_values},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
assert jmespath.search("spec.triggers[0].metadata.query", docs[0]) ==
"test"
- def test_overwrite_keda_use_pgbouncer_enable(self):
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {
+ "keda": {"usePgbouncer": False},
+ "celery": {
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"usePgbouncer": True}}],
+ },
+ },
+ {
+ "celery": {
+ "keda": {"usePgbouncer": False},
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"usePgbouncer": True}}],
+ }
+ },
+ ],
+ )
+ def test_overwrite_keda_use_pgbouncer_enable(self, workers_values):
docs = render_chart(
values={
"pgbouncer": {"enabled": True},
- "workers": {
- "keda": {"usePgbouncer": False},
- "celery": {
- "enableDefault": False,
- "sets": [{"name": "test", "keda": {"enabled": True,
"usePgbouncer": True}}],
- },
- },
+ "workers": workers_values,
},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
@@ -1495,16 +1605,30 @@ class TestWorkerSets:
== "AIRFLOW_CONN_AIRFLOW_DB"
)
- def test_overwrite_keda_use_pgbouncer_disable(self):
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {
+ "keda": {"usePgbouncer": True},
+ "celery": {
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"usePgbouncer": False}}],
+ },
+ },
+ {
+ "celery": {
+ "keda": {"usePgbouncer": True},
+ "enableDefault": False,
+ "sets": [{"name": "test", "keda": {"enabled": True,
"usePgbouncer": False}}],
+ }
+ },
+ ],
+ )
+ def test_overwrite_keda_use_pgbouncer_disable(self, workers_values):
docs = render_chart(
values={
"pgbouncer": {"enabled": True},
- "workers": {
- "celery": {
- "enableDefault": False,
- "sets": [{"name": "test", "keda": {"enabled": True,
"usePgbouncer": False}}],
- },
- },
+ "workers": workers_values,
},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
diff --git a/helm-tests/tests/helm_tests/other/test_keda.py
b/helm-tests/tests/helm_tests/other/test_keda.py
index 4ed70c20a8b..1d0b9e4191e 100644
--- a/helm-tests/tests/helm_tests/other/test_keda.py
+++ b/helm-tests/tests/helm_tests/other/test_keda.py
@@ -32,6 +32,13 @@ class TestKeda:
)
assert docs == []
+ def test_keda_disabled_overwrite(self):
+ docs = render_chart(
+ values={"workers": {"keda": {"enabled": True}, "celery": {"keda":
{"enabled": False}}}},
+ show_only=["templates/workers/worker-kedaautoscaler.yaml"],
+ )
+ assert docs == []
+
@pytest.mark.parametrize(
"executor",
[
@@ -40,11 +47,18 @@ class TestKeda:
"CeleryExecutor,KubernetesExecutor",
],
)
- def test_keda_enabled(self, executor):
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {"keda": {"enabled": True}, "celery": {"persistence": {"enabled":
False}}},
+ {"celery": {"keda": {"enabled": True}, "persistence": {"enabled":
False}}},
+ ],
+ )
+ def test_keda_enabled(self, executor, workers_values):
"""ScaledObject should only be created when enabled and executor is
Celery or CeleryKubernetes."""
docs = render_chart(
values={
- "workers": {"keda": {"enabled": True}, "celery":
{"persistence": {"enabled": False}}},
+ "workers": workers_values,
"executor": executor,
},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
@@ -58,7 +72,7 @@ class TestKeda:
def test_include_event_source_container_name_in_scaled_object(self,
executor):
docs = render_chart(
values={
- "workers": {"keda": {"enabled": True}, "celery":
{"persistence": {"enabled": False}}},
+ "workers": {"celery": {"keda": {"enabled": True},
"persistence": {"enabled": False}}},
"executor": executor,
},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
@@ -68,9 +82,77 @@ class TestKeda:
@pytest.mark.parametrize(
"executor", ["CeleryExecutor", "CeleryKubernetesExecutor",
"CeleryExecutor,KubernetesExecutor"]
)
- def test_keda_advanced(self, executor):
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {
+ "keda": {
+ "enabled": True,
+ "advanced": {
+ "horizontalPodAutoscalerConfig": {
+ "behavior": {
+ "scaleDown": {
+ "stabilizationWindowSeconds": 300,
+ "policies": [{"type": "Percent", "value":
100, "periodSeconds": 15}],
+ }
+ }
+ }
+ },
+ }
+ },
+ {
+ "celery": {
+ "keda": {
+ "enabled": True,
+ "advanced": {
+ "horizontalPodAutoscalerConfig": {
+ "behavior": {
+ "scaleDown": {
+ "stabilizationWindowSeconds": 300,
+ "policies": [{"type": "Percent",
"value": 100, "periodSeconds": 15}],
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ {
+ "keda": {
+ "advanced": {
+ "horizontalPodAutoscalerConfig": {
+ "behavior": {"scaleUp":
{"stabilizationWindowSeconds": 5}}
+ }
+ }
+ },
+ "celery": {
+ "keda": {
+ "enabled": True,
+ "advanced": {
+ "horizontalPodAutoscalerConfig": {
+ "behavior": {
+ "scaleDown": {
+ "stabilizationWindowSeconds": 300,
+ "policies": [{"type": "Percent",
"value": 100, "periodSeconds": 15}],
+ }
+ }
+ }
+ },
+ }
+ },
+ },
+ ],
+ )
+ def test_keda_advanced(self, executor, workers_values):
"""Verify keda advanced config."""
- expected_advanced = {
+ docs = render_chart(
+ values={
+ "workers": workers_values,
+ "executor": executor,
+ },
+ show_only=["templates/workers/worker-kedaautoscaler.yaml"],
+ )
+ assert jmespath.search("spec.advanced", docs[0]) == {
"horizontalPodAutoscalerConfig": {
"behavior": {
"scaleDown": {
@@ -80,19 +162,6 @@ class TestKeda:
}
}
}
- docs = render_chart(
- values={
- "workers": {
- "keda": {
- "enabled": True,
- "advanced": expected_advanced,
- },
- },
- "executor": executor,
- },
- show_only=["templates/workers/worker-kedaautoscaler.yaml"],
- )
- assert jmespath.search("spec.advanced", docs[0]) == expected_advanced
@staticmethod
def build_query(executor, concurrency=16, kubernetes_queue=None,
worker_queues="default"):
@@ -133,7 +202,7 @@ class TestKeda:
"""Verify keda sql query uses configured concurrency."""
docs = render_chart(
values={
- "workers": {"keda": {"enabled": True}, "celery":
{"persistence": {"enabled": False}}},
+ "workers": {"celery": {"keda": {"enabled": True},
"persistence": {"enabled": False}}},
"executor": executor,
"config": {"celery": {"worker_concurrency": concurrency}},
},
@@ -161,7 +230,7 @@ class TestKeda:
and we also verify here that we use the configured queue name in that
case.
"""
values = {
- "workers": {"keda": {"enabled": True}, "celery": {"persistence":
{"enabled": False}}},
+ "workers": {"celery": {"keda": {"enabled": True}, "persistence":
{"enabled": False}}},
"executor": executor,
}
if queue:
@@ -175,19 +244,19 @@ class TestKeda:
assert jmespath.search("spec.triggers[0].metadata.query", docs[0]) ==
expected_query
@pytest.mark.parametrize(
- ("workers_persistence_values", "kind"),
+ ("workers_values", "kind"),
[
- ({"celery": {"persistence": {"enabled": True}}}, "StatefulSet"),
- ({"celery": {"persistence": {"enabled": False}}}, "Deployment"),
- ({"persistence": {"enabled": True}}, "StatefulSet"),
- ({"persistence": {"enabled": False}}, "Deployment"),
+ ({"celery": {"keda": {"enabled": True}, "persistence": {"enabled":
True}}}, "StatefulSet"),
+ ({"celery": {"keda": {"enabled": True}, "persistence": {"enabled":
False}}}, "Deployment"),
+ ({"persistence": {"enabled": True}, "celery": {"keda": {"enabled":
True}}}, "StatefulSet"),
+ ({"persistence": {"enabled": False}, "celery": {"keda":
{"enabled": True}}}, "Deployment"),
],
)
- def test_persistence(self, workers_persistence_values, kind):
+ def test_persistence(self, workers_values, kind):
"""If worker persistence is enabled, scaleTargetRef should be
StatefulSet else Deployment."""
docs = render_chart(
values={
- "workers": {"keda": {"enabled": True},
**workers_persistence_values},
+ "workers": workers_values,
"executor": "CeleryExecutor",
},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
@@ -195,13 +264,20 @@ class TestKeda:
assert jmespath.search("spec.scaleTargetRef.kind", docs[0]) == kind
- def test_default_keda_db_connection(self):
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {"keda": {"enabled": True}},
+ {"celery": {"keda": {"enabled": True}}},
+ ],
+ )
+ def test_default_keda_db_connection(self, workers_values):
"""Verify default keda db connection."""
import base64
docs = render_chart(
values={
- "workers": {"keda": {"enabled": True}},
+ "workers": workers_values,
"executor": "CeleryExecutor",
},
show_only=[
@@ -230,13 +306,20 @@ class TestKeda:
)
assert autoscaler_connection_env_var == "AIRFLOW_CONN_AIRFLOW_DB"
- def test_default_keda_db_connection_pgbouncer_enabled(self):
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {"keda": {"enabled": True}},
+ {"celery": {"keda": {"enabled": True}}},
+ ],
+ )
+ def test_default_keda_db_connection_pgbouncer_enabled(self,
workers_values):
"""Verify keda db connection when pgbouncer is enabled."""
import base64
docs = render_chart(
values={
- "workers": {"keda": {"enabled": True}},
+ "workers": workers_values,
"executor": "CeleryExecutor",
"pgbouncer": {"enabled": True},
},
@@ -266,13 +349,20 @@ class TestKeda:
)
assert autoscaler_connection_env_var == "AIRFLOW_CONN_AIRFLOW_DB"
- def
test_default_keda_db_connection_pgbouncer_enabled_usePgbouncer_false(self):
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {"keda": {"enabled": True, "usePgbouncer": False}},
+ {"celery": {"keda": {"enabled": True, "usePgbouncer": False}}},
+ ],
+ )
+ def
test_default_keda_db_connection_pgbouncer_enabled_usePgbouncer_false(self,
workers_values):
"""Verify keda db connection when pgbouncer is enabled and
usePgbouncer is false."""
import base64
docs = render_chart(
values={
- "workers": {"keda": {"enabled": True, "usePgbouncer": False}},
+ "workers": workers_values,
"executor": "CeleryExecutor",
"pgbouncer": {"enabled": True},
},
@@ -309,14 +399,21 @@ class TestKeda:
)
assert autoscaler_connection_env_var == "KEDA_DB_CONN"
- def test_mysql_keda_db_connection(self):
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {"keda": {"enabled": True}},
+ {"celery": {"keda": {"enabled": True}}},
+ ],
+ )
+ def test_mysql_keda_db_connection(self, workers_values):
"""Verify keda db connection when pgbouncer is enabled."""
import base64
docs = render_chart(
values={
"data": {"metadataConnection": {"protocol": "mysql", "port":
3306}},
- "workers": {"keda": {"enabled": True}},
+ "workers": workers_values,
"executor": "CeleryExecutor",
},
show_only=[
@@ -367,8 +464,7 @@ class TestKeda:
docs = render_chart(
values={
"workers": {
- "keda": {"enabled": True},
- "celery": {"queue": queue},
+ "celery": {"keda": {"enabled": True}, "queue": queue},
},
},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
@@ -380,3 +476,67 @@ class TestKeda:
expected_query = self.build_query(executor="CeleryExecutor",
worker_queues=queue)
assert query == expected_query
+
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {"keda": {"pollingInterval": 10}, "celery": {"keda": {"enabled":
True}}},
+ {"celery": {"keda": {"enabled": True, "pollingInterval": 10}}},
+ {"keda": {"pollingInterval": 1}, "celery": {"keda": {"enabled":
True, "pollingInterval": 10}}},
+ ],
+ )
+ def test_overwrite_keda_pooling_interval(self, workers_values):
+ docs = render_chart(
+ values={"workers": workers_values},
+ show_only=["templates/workers/worker-kedaautoscaler.yaml"],
+ )
+
+ assert jmespath.search("spec.pollingInterval", docs[0]) == 10
+
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {"keda": {"cooldownPeriod": 10}, "celery": {"keda": {"enabled":
True}}},
+ {"celery": {"keda": {"enabled": True, "cooldownPeriod": 10}}},
+ {"keda": {"cooldownPeriod": 1}, "celery": {"keda": {"enabled":
True, "cooldownPeriod": 10}}},
+ ],
+ )
+ def test_overwrite_keda_cooldown_period(self, workers_values):
+ docs = render_chart(
+ values={"workers": workers_values},
+ show_only=["templates/workers/worker-kedaautoscaler.yaml"],
+ )
+
+ assert jmespath.search("spec.cooldownPeriod", docs[0]) == 10
+
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {"keda": {"minReplicaCount": 10}, "celery": {"keda": {"enabled":
True}}},
+ {"celery": {"keda": {"enabled": True, "minReplicaCount": 10}}},
+ {"keda": {"minReplicaCount": 1}, "celery": {"keda": {"enabled":
True, "minReplicaCount": 10}}},
+ ],
+ )
+ def test_overwrite_keda_min_replica_count(self, workers_values):
+ docs = render_chart(
+ values={"workers": workers_values},
+ show_only=["templates/workers/worker-kedaautoscaler.yaml"],
+ )
+
+ assert jmespath.search("spec.minReplicaCount", docs[0]) == 10
+
+ @pytest.mark.parametrize(
+ "workers_values",
+ [
+ {"keda": {"maxReplicaCount": 5}, "celery": {"keda": {"enabled":
True}}},
+ {"celery": {"keda": {"enabled": True, "maxReplicaCount": 5}}},
+ {"keda": {"maxReplicaCount": 1}, "celery": {"keda": {"enabled":
True, "maxReplicaCount": 5}}},
+ ],
+ )
+ def test_overwrite_keda_max_replica_count(self, workers_values):
+ docs = render_chart(
+ values={"workers": workers_values},
+ show_only=["templates/workers/worker-kedaautoscaler.yaml"],
+ )
+
+ assert jmespath.search("spec.maxReplicaCount", docs[0]) == 5
diff --git a/helm-tests/tests/helm_tests/other/test_pgbouncer.py
b/helm-tests/tests/helm_tests/other/test_pgbouncer.py
index df6dbe66e08..f8b3885cc94 100644
--- a/helm-tests/tests/helm_tests/other/test_pgbouncer.py
+++ b/helm-tests/tests/helm_tests/other/test_pgbouncer.py
@@ -869,97 +869,88 @@ class TestPgbouncerNetworkPolicy:
assert jmespath.search("metadata.name", docs[0]) ==
"release-name-pgbouncer-policy"
@pytest.mark.parametrize(
- ("conf", "expected_selector"),
+ "values",
[
- # test with workers.keda enabled without namespace labels
- (
- {"executor": "CeleryExecutor", "workers": {"keda": {"enabled":
True}}},
- [{"podSelector": {"matchLabels": {"app": "keda-operator"}}}],
- ),
- # test with triggerer.keda enabled without namespace labels
- (
- {"triggerer": {"keda": {"enabled": True}}},
- [{"podSelector": {"matchLabels": {"app": "keda-operator"}}}],
- ),
- # test with workers.keda and triggerer.keda both enabled without
namespace labels
- (
- {
- "executor": "CeleryExecutor",
- "workers": {"keda": {"enabled": True}},
- "triggerer": {"keda": {"enabled": True}},
+ {"executor": "CeleryExecutor", "workers": {"keda": {"enabled":
True}}},
+ {"triggerer": {"keda": {"enabled": True}}},
+ {
+ "executor": "CeleryExecutor",
+ "workers": {"keda": {"enabled": True}},
+ "triggerer": {"keda": {"enabled": True}},
+ },
+ ],
+ )
+ def test_pod_selectors_with_keda_without_namespace_labels(self, values):
+ docs = render_chart(
+ values={
+ "pgbouncer": {"enabled": True},
+ "networkPolicies": {"enabled": True},
+ **values,
+ },
+ show_only=["templates/pgbouncer/pgbouncer-networkpolicy.yaml"],
+ )
+
+ assert jmespath.search("spec.ingress[0].from[1:]", docs[0]) == [
+ {"podSelector": {"matchLabels": {"app": "keda-operator"}}}
+ ]
+
+ @pytest.mark.parametrize(
+ "conf",
+ [
+ # test with workers.keda/workers.celery.keda enabled with
namespace labels
+ {
+ "executor": "CeleryExecutor",
+ "workers": {
+ "keda": {"namespaceLabels": {"app": "airflow"}},
+ "celery": {"keda": {"enabled": True}},
},
- [{"podSelector": {"matchLabels": {"app": "keda-operator"}}}],
- ),
- # test with workers.keda enabled with namespace labels
- (
- {
- "executor": "CeleryExecutor",
- "workers": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
+ },
+ {
+ "executor": "CeleryExecutor",
+ "workers": {"celery": {"keda": {"enabled": True,
"namespaceLabels": {"app": "airflow"}}}},
+ },
+ {
+ "executor": "CeleryExecutor",
+ "workers": {
+ "keda": {"namespaceLabels": {"airflow": "app"}},
+ "celery": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
},
- [
- {
- "namespaceSelector": {"matchLabels": {"app":
"airflow"}},
- "podSelector": {"matchLabels": {"app":
"keda-operator"}},
- }
- ],
- ),
+ },
# test with triggerer.keda enabled with namespace labels
- (
- {"triggerer": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}}},
- [
- {
- "namespaceSelector": {"matchLabels": {"app":
"airflow"}},
- "podSelector": {"matchLabels": {"app":
"keda-operator"}},
- }
- ],
- ),
- # test with workers.keda and triggerer.keda both enabled with
namespace labels
- (
- {
- "executor": "CeleryExecutor",
- "workers": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
- "triggerer": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
- },
- [
- {
- "namespaceSelector": {"matchLabels": {"app":
"airflow"}},
- "podSelector": {"matchLabels": {"app":
"keda-operator"}},
- }
- ],
- ),
- # test with workers.keda and triggerer.keda both enabled workers
with namespace labels
- # and triggerer without namespace labels
- (
- {
- "executor": "CeleryExecutor",
- "workers": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
- "triggerer": {"keda": {"enabled": True}},
- },
- [
- {
- "namespaceSelector": {"matchLabels": {"app":
"airflow"}},
- "podSelector": {"matchLabels": {"app":
"keda-operator"}},
- }
- ],
- ),
- # test with workers.keda and triggerer.keda both enabled workers
without namespace labels
+ {"triggerer": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}}},
+ # test with workers.keda/workers.celery.keda and triggerer.keda
both enabled with namespace labels
+ {
+ "executor": "CeleryExecutor",
+ "workers": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
+ "triggerer": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
+ },
+ {
+ "executor": "CeleryExecutor",
+ "workers": {"celery": {"keda": {"enabled": True,
"namespaceLabels": {"app": "airflow"}}}},
+ "triggerer": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
+ },
+ # test with workers.keda/workers.celery.keda and triggerer.keda
both enabled workers
+ # with namespace labels and triggerer without namespace labels
+ {
+ "executor": "CeleryExecutor",
+ "workers": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
+ "triggerer": {"keda": {"enabled": True}},
+ },
+ {
+ "executor": "CeleryExecutor",
+ "workers": {"celery": {"keda": {"enabled": True,
"namespaceLabels": {"app": "airflow"}}}},
+ "triggerer": {"keda": {"enabled": True}},
+ },
+ # test with workers.celery.keda and triggerer.keda both enabled
workers without namespace labels
# and triggerer with namespace labels
- (
- {
- "executor": "CeleryExecutor",
- "workers": {"keda": {"enabled": True}},
- "triggerer": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
- },
- [
- {
- "namespaceSelector": {"matchLabels": {"app":
"airflow"}},
- "podSelector": {"matchLabels": {"app":
"keda-operator"}},
- }
- ],
- ),
+ {
+ "executor": "CeleryExecutor",
+ "workers": {"celery": {"keda": {"enabled": True}}},
+ "triggerer": {"keda": {"enabled": True, "namespaceLabels":
{"app": "airflow"}}},
+ },
],
)
- def test_pgbouncer_network_policy_with_keda(self, conf, expected_selector):
+ def test_pod_selectors_with_namespace_labels(self, conf):
docs = render_chart(
values={
"pgbouncer": {"enabled": True},
@@ -969,7 +960,12 @@ class TestPgbouncerNetworkPolicy:
show_only=["templates/pgbouncer/pgbouncer-networkpolicy.yaml"],
)
- assert expected_selector ==
jmespath.search("spec.ingress[0].from[1:]", docs[0])
+ assert jmespath.search("spec.ingress[0].from[1:]", docs[0]) == [
+ {
+ "namespaceSelector": {"matchLabels": {"app": "airflow"}},
+ "podSelector": {"matchLabels": {"app": "keda-operator"}},
+ }
+ ]
class TestPgbouncerIngress: