piotrlinski commented on code in PR #61527: URL: https://github.com/apache/airflow/pull/61527#discussion_r2787620355
########## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py: ########## @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing connections, variables, and configs from Kubernetes Secrets.""" + +from __future__ import annotations + +import base64 +from functools import cached_property +from pathlib import Path + +from kubernetes.client import ApiClient, CoreV1Api +from kubernetes.config import load_incluster_config + +from airflow.exceptions import AirflowException +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + +class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin): + """ + Retrieve connections, variables, and configs from Kubernetes Secrets using labels. + + This backend discovers secrets by querying Kubernetes labels, enabling integration + with External Secrets Operator (ESO), Sealed Secrets, or any tool that creates + Kubernetes secrets — regardless of the secret's name. + + Configurable via ``airflow.cfg``: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend + backend_kwargs = {"namespace": "airflow", "connections_label": "airflow.apache.org/connection-name"} + + The secret must have a label whose key matches the configured label and whose value + matches the requested identifier (conn_id, variable key, or config key). The actual + secret value is read from the ``value`` key in the secret's data. + + Example Kubernetes secret for a connection named ``my_db``: + + .. code-block:: yaml + + apiVersion: v1 + kind: Secret + metadata: + name: anything + labels: + airflow.apache.org/connection-name: my_db + data: + value: <base64-encoded-connection-uri> + + **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` directly + for in-cluster authentication. Does not use KubernetesHook or any Airflow connection, + avoiding circular dependencies since this IS the secrets backend. + The namespace can be set explicitly via ``backend_kwargs``. If not set, it is + auto-detected from the pod's service account metadata at + ``/var/run/secrets/kubernetes.io/serviceaccount/namespace``. If auto-detection + fails (e.g. automountServiceAccountToken is disabled), an error is raised. + + **Performance:** Queries use ``resource_version="0"`` so the Kubernetes API server + serves results from its in-memory watch cache, making lookups very fast without + requiring Airflow-side caching. + + :param namespace: Kubernetes namespace to query for secrets. If not set, the + namespace is auto-detected from the pod's service account metadata. If + auto-detection fails, an ``AirflowException`` is raised. + :param connections_label: Label key used to discover connection secrets. + If set to None, requests for connections will not be sent to Kubernetes. + :param variables_label: Label key used to discover variable secrets. + If set to None, requests for variables will not be sent to Kubernetes. + :param config_label: Label key used to discover config secrets. + If set to None, requests for configurations will not be sent to Kubernetes. + :param connections_data_key: The data key in the Kubernetes secret that holds the + connection value. Default: ``"value"`` + :param variables_data_key: The data key in the Kubernetes secret that holds the + variable value. Default: ``"value"`` + :param config_data_key: The data key in the Kubernetes secret that holds the + config value. Default: ``"value"`` + """ + + def __init__( + self, + namespace: str | None = None, + connections_label: str | None = "airflow.apache.org/connection-name", + variables_label: str | None = "airflow.apache.org/variable-name", + config_label: str | None = "airflow.apache.org/config-name", + connections_data_key: str = "value", + variables_data_key: str = "value", + config_data_key: str = "value", + **kwargs, + ): + super().__init__(**kwargs) + self._namespace = namespace + self.connections_label = connections_label + self.variables_label = variables_label + self.config_label = config_label + self.connections_data_key = connections_data_key + self.variables_data_key = variables_data_key + self.config_data_key = config_data_key + + @cached_property + def namespace(self) -> str: + """Return the configured namespace, or auto-detect from service account metadata.""" + if self._namespace: + return self._namespace + try: + return Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read_text().strip() + except FileNotFoundError: + raise AirflowException( + "Could not auto-detect Kubernetes namespace from " + "/var/run/secrets/kubernetes.io/serviceaccount/namespace. " + "Is automountServiceAccountToken disabled for this pod? " + "Set the 'namespace' parameter explicitly in backend_kwargs." + ) + + @cached_property + def client(self) -> CoreV1Api: + """Lazy-init Kubernetes CoreV1Api client using in-cluster config directly.""" + load_incluster_config() + return CoreV1Api(ApiClient()) + + def get_conn_value(self, conn_id: str, team_name: str | None = None) -> str | None: + """ + Get serialized representation of Connection from a Kubernetes secret. + + :param conn_id: connection id + :param team_name: Team name associated to the task trying to access the connection (if any) + """ + if self.connections_label is None: + return None + return self._get_secret_by_label(self.connections_label, conn_id, self.connections_data_key) + + def get_variable(self, key: str, team_name: str | None = None) -> str | None: + """ + Get Airflow Variable from a Kubernetes secret. + + :param key: Variable Key + :param team_name: Team name associated to the task trying to access the variable (if any) + :return: Variable Value + """ + if self.variables_label is None: + return None + return self._get_secret_by_label(self.variables_label, key, self.variables_data_key) + + def get_config(self, key: str) -> str | None: + """ + Get Airflow Configuration from a Kubernetes secret. + + :param key: Configuration Option Key + :return: Configuration Option Value + """ + if self.config_label is None: + return None + return self._get_secret_by_label(self.config_label, key, self.config_data_key) Review Comment: documentation is not accurate, the method is there, I use this with hashicorp secrets backend already https://github.com/apache/airflow/blob/main/shared/secrets_backend/src/airflow_shared/secrets_backend/base.py#L58 https://github.com/apache/airflow/blob/main/providers/hashicorp/src/airflow/providers/hashicorp/secrets/vault.py#L246 actually it is in docs https://airflow.apache.org/docs/apache-airflow/3.1.7/_api/airflow/secrets/base_secrets/index.html (the link you have provided is to v1.* of airflow) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
