jykae commented on code in PR #68074: URL: https://github.com/apache/airflow/pull/68074#discussion_r3387477354
########## chart/files/db_migrate.py: ########## @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Bidirectional Airflow metadata DB reconciliation for the helm chart. + +Decides at runtime whether the helm release wants a forward migrate, a +downgrade, or a no-op, and runs the right command: + +* target == current -> no-op (idempotent check) +* target > current -> ``airflow db migrate`` inside this job's container + (uses the TARGET image, which ships forward scripts). +* target < current -> ``airflow db downgrade --to-version <target>`` + executed inside the still-running api-server pod (the OLD image still + ships the reverse scripts), followed by scaling every DB-touching + workload (api-server, scheduler, triggerer, dag-processor, worker) to + zero so that no OLD pod keeps talking to the now-downgraded schema. Helm + then patches those workloads back to ``replicas: N`` with the TARGET + image as the upgrade proceeds, so the cluster comes back up cleanly on + the target version. This means a downgrade trades the otherwise-broken + rolling-update window for a brief outage (which is unavoidable when the + schema goes backwards). + +Required env: + +* ``AIRFLOW_TARGET_VERSION`` - the version the chart is being upgraded/installed to. +* ``POD_NAMESPACE`` - release namespace, injected via downward API. +* ``RELEASE_NAME`` - the helm release name, used to scope the scale-down to + only the workloads owned by this release. + +Optional env: + +* ``MIGRATE_JOB_DRAIN_TIMEOUT_SECONDS`` - how long to wait for DB-touching + pods to terminate after scale-to-zero on a downgrade. Defaults to 300. + Increase when long-running worker tasks need more time to wind down. +""" + +from __future__ import annotations + +import os +import subprocess +import sys +import time + +from alembic.migration import MigrationContext +from packaging.version import InvalidVersion, Version +from sqlalchemy import text +from sqlalchemy.exc import OperationalError +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + stop_after_delay, + wait_exponential, + wait_fixed, +) + +from airflow.settings import engine + +# The kubernetes client is only used by the downgrade branch (exec ``airflow db +# downgrade`` in the api-server pod, then scale DB-touching workloads to zero). +# The base Airflow image does not ship the kubernetes client unless the +# cncf.kubernetes provider is installed, so importing it unconditionally would +# break the far more common forward / no-op / fresh paths. Import it lazily and +# only fail when a downgrade is actually selected. +try: + from kubernetes import client, config as k8s_config + from kubernetes.stream import stream + + KUBERNETES_IMPORT_ERROR: ImportError | None = None +except ImportError as exc: # pragma: no cover - only on images without the k8s client + client = k8s_config = stream = None # type: ignore[assignment] + KUBERNETES_IMPORT_ERROR = exc + +# ``_REVISION_HEADS_MAP`` is private today; a public accessor is being tracked +# upstream so the chart can drop the leading underscore in a future release. +from airflow.utils.db import _REVISION_HEADS_MAP + + +def _int_env(name: str, default: int) -> int: + """Return integer env var *name*, or *default* when unset. + + These knobs come from chart config; a typo (e.g. ``"5m"``) should fail with + an actionable message rather than an opaque ``ValueError`` traceback deep in + the job logs. + """ + raw = os.environ.get(name) + if raw is None: + return default + try: + return int(raw) + except ValueError: + raise SystemExit(f"{name} must be an integer number of seconds, got {raw!r}") + + +def _resolve_target_rev(target: str) -> str | None: + """Return the alembic head for *target*, falling back to the nearest lower mapped version. + + ``_REVISION_HEADS_MAP`` does not list every patch release — patches often + share the head of their minor's first release. Mirror Airflow's own CLI + behaviour by picking the highest mapped version that is ``<= target``. + """ + if target in _REVISION_HEADS_MAP: + return _REVISION_HEADS_MAP[target] + try: + target_v = Version(target) + except InvalidVersion: + # ``AIRFLOW_TARGET_VERSION`` comes from chart config; a nonstandard or + # misconfigured ``airflowVersion`` (e.g. a dev build) is not PEP 440 + # parseable. Treat it as an unknown target so :func:`decide_action` + # falls back to a conservative forward migrate instead of crashing. + return None + candidates = [(Version(v), rev) for v, rev in _REVISION_HEADS_MAP.items() if Version(v) <= target_v] + if not candidates: + return None + return max(candidates, key=lambda pair: pair[0])[1] + + +def _db_connect_stop(retry_state): + # Evaluate ``DB_CONNECT_MAX_WAIT_SECONDS`` at retry time (not import time) + # so that operators -- and unit tests -- can tune the wait window without + # reloading the module. ``/entrypoint`` skips its DB-wait for non-airflow + # commands (we run as ``python -c ...``), so on a fresh install with a + # bundled postgres still starting, the first connect attempt races the DB. + # Default 120s matches the entrypoint's + # ``CONNECTION_CHECK_MAX_COUNT`` * ``CONNECTION_CHECK_SLEEP_TIME``. + delay = _int_env("DB_CONNECT_MAX_WAIT_SECONDS", 120) + return stop_after_delay(delay)(retry_state) + + +@retry( + stop=_db_connect_stop, + wait=wait_fixed(3), + retry=retry_if_exception_type(OperationalError), + reraise=True, +) +def _wait_for_db_ready() -> None: + """Block until the metadata DB accepts a connection. + + Called once at the top of :func:`main` so that *every* downstream step + (``decide_action``, ``airflow db migrate`` subprocess, ``kubernetes`` + api-server pod exec) can assume the DB is reachable. Without this, the + subprocess branch had no DB-wait of its own and would fail immediately + on a fresh install where the bundled postgres was still initialising, + causing ``BackoffLimitExceeded`` on the Job. + """ + with engine.connect() as conn: + conn.execute(text("SELECT 1")) + + +def decide_action(target: str) -> str: + """Return one of ``noop``, ``forward``, ``downgrade``, ``fresh``. + + Assumes the DB is already reachable -- :func:`_wait_for_db_ready` must + have been called first. + """ + target_rev = _resolve_target_rev(target) + if target_rev is None: + # Unknown target version (e.g. dev build). Be conservative: forward only. + return "forward" + + with engine.connect() as conn: + current_rev = MigrationContext.configure(conn).get_current_revision() + + if current_rev is None: + return "fresh" + if current_rev == target_rev: + return "noop" + + # Reverse-lookup current_rev to determine which version it belongs to, + # then compare versions. If current_rev isn't mapped (dev/intermediate + # alembic revision) be conservative and forward-migrate rather than risk + # an incorrect downgrade. + rev_to_version = {rev: ver for ver, rev in _REVISION_HEADS_MAP.items()} + current_version = rev_to_version.get(current_rev) + if current_version is None: + return "forward" + if Version(current_version) > Version(target): + return "downgrade" + return "forward" + + +@retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=2, min=2, max=30), + retry=retry_if_exception_type(RuntimeError), + reraise=True, +) +def discover_api_server_pod(namespace: str) -> str: + """Return the name of a Running api-server pod in *namespace*. + + Retries on ``RuntimeError`` so a rolling restart of the api-server + deployment (no Running pod for a few seconds) does not fail the job. + """ + k8s_config.load_incluster_config() + api = client.CoreV1Api() + pods = api.list_namespaced_pod( + namespace=namespace, + label_selector="component=api-server", + field_selector="status.phase=Running", + ).items Review Comment: Fixed — `discover_api_server_pod()` now takes `release_name` and filters on `release=<release_name>,component=api-server`, so when several Airflow releases share a namespace the downgrade is exec'd strictly in the current release's api-server pod. `RELEASE_NAME` is already required/validated on the downgrade branch, and the tests assert the release-scoped selector. ########## chart/tests/helm_tests/airflow_aux/test_migrate_database_job_rbac.py: ########## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Tests for the RBAC granted to the migrate-database-job. + +The migrate-database-job needs ``pods`` + ``pods/exec`` access only on the +downgrade branch, where it must exec ``airflow db downgrade`` inside the +still-running api-server pod (whose image still ships the reverse alembic +scripts). The Role is always rendered so the forward-migrate path remains +identical for users who only ever upgrade. + +Tracked in https://github.com/apache/airflow/issues/68072. +""" + +from __future__ import annotations + +import jmespath +import pytest +from chart_utils.helm_template_generator import render_chart + +ROLE_TEMPLATE = "templates/rbac/migrate-database-job-role.yaml" +ROLEBINDING_TEMPLATE = "templates/rbac/migrate-database-job-rolebinding.yaml" + + +class TestMigrateDatabaseJobRBAC: + def test_role_and_binding_render_by_default(self): + docs = render_chart(show_only=[ROLE_TEMPLATE, ROLEBINDING_TEMPLATE]) + kinds = sorted(d["kind"] for d in docs) + assert kinds == ["Role", "RoleBinding"] + + @pytest.mark.parametrize("rbac_create", [False, True]) + def test_gated_on_rbac_create(self, rbac_create): + docs = render_chart( + values={"rbac": {"create": rbac_create}}, + show_only=[ROLE_TEMPLATE, ROLEBINDING_TEMPLATE], + ) + assert bool(docs) is rbac_create + + def test_role_rules_grant_pods_and_exec(self): + docs = render_chart(show_only=[ROLE_TEMPLATE]) + rules = jmespath.search("rules", docs[0]) + resources_to_verbs = {tuple(r["resources"]): set(r["verbs"]) for r in rules} + assert ("pods",) in resources_to_verbs + assert {"get", "list"}.issubset(resources_to_verbs[("pods",)]) + assert ("pods/exec",) in resources_to_verbs + # Exec is authorized under ``create`` or ``get`` depending on the + # apiserver/client path, so the role grants both for compatibility. + assert resources_to_verbs[("pods/exec",)] == {"create", "get"} + + def test_role_rules_grant_deployments_and_statefulsets_scale(self): + # The downgrade branch scales every DB-touching workload to 0 after + # running ``airflow db downgrade`` so no OLD code keeps querying the + # now-downgraded schema before helm rolls in TARGET pods. + docs = render_chart(show_only=[ROLE_TEMPLATE]) + rules = jmespath.search("rules", docs[0]) + resources_to_verbs = {tuple(r["resources"]): set(r["verbs"]) for r in rules} + assert ("deployments", "statefulsets") in resources_to_verbs + assert {"get", "list"}.issubset(resources_to_verbs[("deployments", "statefulsets")]) + assert ("deployments/scale", "statefulsets/scale") in resources_to_verbs + # Only ``patch`` is needed -- the reconciler never reads the scale back. + assert resources_to_verbs[("deployments/scale", "statefulsets/scale")] == {"patch"} + + def test_rolebinding_subject_is_migrate_db_job_sa(self): + docs = render_chart(show_only=[ROLEBINDING_TEMPLATE]) + subjects = jmespath.search("subjects", docs[0]) + assert len(subjects) == 1 + assert subjects[0]["kind"] == "ServiceAccount" + # Matches the name rendered by templates/jobs/migrate-database-job-serviceaccount.yaml + # via the "migrateDatabaseJob.serviceAccountName" helper. + assert subjects[0]["name"] == "release-name-airflow-migrate-database-job" + + def test_rolebinding_references_role(self): + docs = render_chart(show_only=[ROLEBINDING_TEMPLATE]) + role_ref = jmespath.search("roleRef", docs[0]) + assert role_ref["kind"] == "Role" + assert role_ref["name"] == "release-name-migrate-database-job-role" + assert role_ref["apiGroup"] == "rbac.authorization.k8s.io" Review Comment: This assertion already matches the rendered output, so no change is needed. Under the chart default (`useStandardNaming=false`) `airflow.fullname` resolves to just the release name, so both the `Role` (`metadata.name`) and the `RoleBinding` `roleRef.name` render as `release-name-migrate-database-job-role` from the same helper — the test passes and the binding stays internally consistent. (`useStandardNaming=true` would shift both names together to `release-name-airflow-...`.) ########## chart/templates/jobs/migrate-database-job.yaml: ########## @@ -103,8 +103,18 @@ spec: {{- if .Values.migrateDatabaseJob.command }} command: {{- tpl (toYaml .Values.migrateDatabaseJob.command) . | nindent 12 }} {{- end }} - {{- if .Values.migrateDatabaseJob.args }} + {{- if not (kindIs "invalid" .Values.migrateDatabaseJob.args) }} args: {{- tpl (toYaml .Values.migrateDatabaseJob.args) . | nindent 12 }} + {{- else }} + # Bidirectional metadata DB reconciliation: forward migrate, no-op, + # fresh install, or exec downgrade in the still-running api-server + # pod depending on the relationship between the chart's target + # version and the DB's current alembic head. + args: + - "python" + - "-c" + - |- + {{- .Files.Get "files/db_migrate.py" | nindent 14 }} Review Comment: Keeping `{{-` here intentionally — this is the standard chart idiom for embedding a file into a block scalar (`{{- .Files.Get ... | nindent N }}`), used throughout the chart. `nindent` prepends its own newline, so the `{{-` left-trim removes the template's literal indentation/newline and the content lands correctly. Dropping the `-` would instead emit a stray whitespace-only first line inside the `python -c` block. Verified the rendered `|-` block starts directly on the license header with correct 14-space indentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
