Nataneljpwd commented on code in PR #61527:
URL: https://github.com/apache/airflow/pull/61527#discussion_r2790403962


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py:
##########
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Objects relating to sourcing connections, variables, and configs from 
Kubernetes Secrets."""
+
+from __future__ import annotations
+
+import base64
+from functools import cached_property
+from pathlib import Path
+
+from kubernetes.client import ApiClient, CoreV1Api
+from kubernetes.config import load_incluster_config
+
+from airflow.exceptions import AirflowException
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin):
+    """
+    Retrieve connections, variables, and configs from Kubernetes Secrets using 
labels.
+
+    This backend discovers secrets by querying Kubernetes labels, enabling 
integration
+    with External Secrets Operator (ESO), Sealed Secrets, or any tool that 
creates
+    Kubernetes secrets — regardless of the secret's name.
+
+    Configurable via ``airflow.cfg``:
+
+    .. code-block:: ini
+
+        [secrets]
+        backend = 
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+        backend_kwargs = {"namespace": "airflow", "connections_label": 
"airflow.apache.org/connection-name"}
+
+    The secret must have a label whose key matches the configured label and 
whose value
+    matches the requested identifier (conn_id, variable key, or config key). 
The actual
+    secret value is read from the ``value`` key in the secret's data.
+
+    Example Kubernetes secret for a connection named ``my_db``:
+
+    .. code-block:: yaml
+
+        apiVersion: v1
+        kind: Secret
+        metadata:
+          name: anything
+          labels:
+            airflow.apache.org/connection-name: my_db
+        data:
+          value: <base64-encoded-connection-uri>
+
+    **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` 
directly
+    for in-cluster authentication. Does not use KubernetesHook or any Airflow 
connection,
+    avoiding circular dependencies since this IS the secrets backend.
+    The namespace can be set explicitly via ``backend_kwargs``. If not set, it 
is
+    auto-detected from the pod's service account metadata at
+    ``/var/run/secrets/kubernetes.io/serviceaccount/namespace``. If 
auto-detection
+    fails (e.g. automountServiceAccountToken is disabled), an error is raised.
+
+    **Performance:** Queries use ``resource_version="0"`` so the Kubernetes 
API server
+    serves results from its in-memory watch cache, making lookups very fast 
without
+    requiring Airflow-side caching.
+
+    :param namespace: Kubernetes namespace to query for secrets. If not set, 
the
+        namespace is auto-detected from the pod's service account metadata. If
+        auto-detection fails, an ``AirflowException`` is raised.
+    :param connections_label: Label key used to discover connection secrets.
+        If set to None, requests for connections will not be sent to 
Kubernetes.
+    :param variables_label: Label key used to discover variable secrets.
+        If set to None, requests for variables will not be sent to Kubernetes.
+    :param config_label: Label key used to discover config secrets.
+        If set to None, requests for configurations will not be sent to 
Kubernetes.
+    :param connections_data_key: The data key in the Kubernetes secret that 
holds the
+        connection value. Default: ``"value"``
+    :param variables_data_key: The data key in the Kubernetes secret that 
holds the
+        variable value. Default: ``"value"``
+    :param config_data_key: The data key in the Kubernetes secret that holds 
the
+        config value. Default: ``"value"``
+    """
+
+    def __init__(
+        self,
+        namespace: str | None = None,
+        connections_label: str | None = "airflow.apache.org/connection-name",
+        variables_label: str | None = "airflow.apache.org/variable-name",
+        config_label: str | None = "airflow.apache.org/config-name",

Review Comment:
   Maybe these should not be nullable?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py:
##########
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Objects relating to sourcing connections, variables, and configs from 
Kubernetes Secrets."""
+
+from __future__ import annotations
+
+import base64
+from functools import cached_property
+from pathlib import Path
+
+from kubernetes.client import ApiClient, CoreV1Api
+from kubernetes.config import load_incluster_config
+
+from airflow.exceptions import AirflowException
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin):
+    """
+    Retrieve connections, variables, and configs from Kubernetes Secrets using 
labels.
+
+    This backend discovers secrets by querying Kubernetes labels, enabling 
integration
+    with External Secrets Operator (ESO), Sealed Secrets, or any tool that 
creates
+    Kubernetes secrets — regardless of the secret's name.
+
+    Configurable via ``airflow.cfg``:
+
+    .. code-block:: ini
+
+        [secrets]
+        backend = 
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+        backend_kwargs = {"namespace": "airflow", "connections_label": 
"airflow.apache.org/connection-name"}
+
+    The secret must have a label whose key matches the configured label and 
whose value
+    matches the requested identifier (conn_id, variable key, or config key). 
The actual
+    secret value is read from the ``value`` key in the secret's data.
+
+    Example Kubernetes secret for a connection named ``my_db``:
+
+    .. code-block:: yaml
+
+        apiVersion: v1
+        kind: Secret
+        metadata:
+          name: anything
+          labels:
+            airflow.apache.org/connection-name: my_db
+        data:
+          value: <base64-encoded-connection-uri>
+
+    **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` 
directly
+    for in-cluster authentication. Does not use KubernetesHook or any Airflow 
connection,
+    avoiding circular dependencies since this IS the secrets backend.
+    The namespace can be set explicitly via ``backend_kwargs``. If not set, it 
is
+    auto-detected from the pod's service account metadata at
+    ``/var/run/secrets/kubernetes.io/serviceaccount/namespace``. If 
auto-detection
+    fails (e.g. automountServiceAccountToken is disabled), an error is raised.
+
+    **Performance:** Queries use ``resource_version="0"`` so the Kubernetes 
API server
+    serves results from its in-memory watch cache, making lookups very fast 
without
+    requiring Airflow-side caching.
+
+    :param namespace: Kubernetes namespace to query for secrets. If not set, 
the
+        namespace is auto-detected from the pod's service account metadata. If
+        auto-detection fails, an ``AirflowException`` is raised.
+    :param connections_label: Label key used to discover connection secrets.
+        If set to None, requests for connections will not be sent to 
Kubernetes.
+    :param variables_label: Label key used to discover variable secrets.
+        If set to None, requests for variables will not be sent to Kubernetes.
+    :param config_label: Label key used to discover config secrets.
+        If set to None, requests for configurations will not be sent to 
Kubernetes.
+    :param connections_data_key: The data key in the Kubernetes secret that 
holds the
+        connection value. Default: ``"value"``
+    :param variables_data_key: The data key in the Kubernetes secret that 
holds the
+        variable value. Default: ``"value"``
+    :param config_data_key: The data key in the Kubernetes secret that holds 
the
+        config value. Default: ``"value"``
+    """
+
+    def __init__(
+        self,
+        namespace: str | None = None,
+        connections_label: str | None = "airflow.apache.org/connection-name",
+        variables_label: str | None = "airflow.apache.org/variable-name",
+        config_label: str | None = "airflow.apache.org/config-name",
+        connections_data_key: str = "value",
+        variables_data_key: str = "value",
+        config_data_key: str = "value",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self._namespace = namespace
+        self.connections_label = connections_label
+        self.variables_label = variables_label
+        self.config_label = config_label
+        self.connections_data_key = connections_data_key
+        self.variables_data_key = variables_data_key
+        self.config_data_key = config_data_key
+
+    @cached_property
+    def namespace(self) -> str:
+        """Return the configured namespace, or auto-detect from service 
account metadata."""
+        if self._namespace:
+            return self._namespace
+        try:
+            return 
Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read_text().strip()
+        except FileNotFoundError:
+            raise AirflowException(
+                "Could not auto-detect Kubernetes namespace from "
+                "/var/run/secrets/kubernetes.io/serviceaccount/namespace. "
+                "Is automountServiceAccountToken disabled for this pod? "
+                "Set the 'namespace' parameter explicitly in backend_kwargs."
+            )

Review Comment:
   I think I have seen almost this exact piece of code elsewhere, I think we 
can use the k8s hook's get namespace method with the incluster option



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py:
##########
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Objects relating to sourcing connections, variables, and configs from 
Kubernetes Secrets."""
+
+from __future__ import annotations
+
+import base64
+from functools import cached_property
+from pathlib import Path
+
+from kubernetes.client import ApiClient, CoreV1Api
+from kubernetes.config import load_incluster_config
+
+from airflow.exceptions import AirflowException
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin):
+    """
+    Retrieve connections, variables, and configs from Kubernetes Secrets using 
labels.
+
+    This backend discovers secrets by querying Kubernetes labels, enabling 
integration
+    with External Secrets Operator (ESO), Sealed Secrets, or any tool that 
creates
+    Kubernetes secrets — regardless of the secret's name.
+
+    Configurable via ``airflow.cfg``:
+
+    .. code-block:: ini
+
+        [secrets]
+        backend = 
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+        backend_kwargs = {"namespace": "airflow", "connections_label": 
"airflow.apache.org/connection-name"}
+
+    The secret must have a label whose key matches the configured label and 
whose value
+    matches the requested identifier (conn_id, variable key, or config key). 
The actual
+    secret value is read from the ``value`` key in the secret's data.
+
+    Example Kubernetes secret for a connection named ``my_db``:
+
+    .. code-block:: yaml
+
+        apiVersion: v1
+        kind: Secret
+        metadata:
+          name: anything
+          labels:
+            airflow.apache.org/connection-name: my_db
+        data:
+          value: <base64-encoded-connection-uri>
+
+    **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` 
directly
+    for in-cluster authentication. Does not use KubernetesHook or any Airflow 
connection,
+    avoiding circular dependencies since this IS the secrets backend.
+    The namespace can be set explicitly via ``backend_kwargs``. If not set, it 
is
+    auto-detected from the pod's service account metadata at
+    ``/var/run/secrets/kubernetes.io/serviceaccount/namespace``. If 
auto-detection
+    fails (e.g. automountServiceAccountToken is disabled), an error is raised.
+
+    **Performance:** Queries use ``resource_version="0"`` so the Kubernetes 
API server
+    serves results from its in-memory watch cache, making lookups very fast 
without
+    requiring Airflow-side caching.
+
+    :param namespace: Kubernetes namespace to query for secrets. If not set, 
the
+        namespace is auto-detected from the pod's service account metadata. If
+        auto-detection fails, an ``AirflowException`` is raised.
+    :param connections_label: Label key used to discover connection secrets.
+        If set to None, requests for connections will not be sent to 
Kubernetes.
+    :param variables_label: Label key used to discover variable secrets.
+        If set to None, requests for variables will not be sent to Kubernetes.
+    :param config_label: Label key used to discover config secrets.
+        If set to None, requests for configurations will not be sent to 
Kubernetes.
+    :param connections_data_key: The data key in the Kubernetes secret that 
holds the
+        connection value. Default: ``"value"``
+    :param variables_data_key: The data key in the Kubernetes secret that 
holds the
+        variable value. Default: ``"value"``
+    :param config_data_key: The data key in the Kubernetes secret that holds 
the
+        config value. Default: ``"value"``
+    """
+
+    def __init__(
+        self,
+        namespace: str | None = None,
+        connections_label: str | None = "airflow.apache.org/connection-name",
+        variables_label: str | None = "airflow.apache.org/variable-name",
+        config_label: str | None = "airflow.apache.org/config-name",
+        connections_data_key: str = "value",
+        variables_data_key: str = "value",
+        config_data_key: str = "value",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self._namespace = namespace
+        self.connections_label = connections_label
+        self.variables_label = variables_label
+        self.config_label = config_label
+        self.connections_data_key = connections_data_key
+        self.variables_data_key = variables_data_key
+        self.config_data_key = config_data_key
+
+    @cached_property
+    def namespace(self) -> str:
+        """Return the configured namespace, or auto-detect from service 
account metadata."""
+        if self._namespace:
+            return self._namespace
+        try:
+            return 
Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read_text().strip()
+        except FileNotFoundError:
+            raise AirflowException(
+                "Could not auto-detect Kubernetes namespace from "
+                "/var/run/secrets/kubernetes.io/serviceaccount/namespace. "
+                "Is automountServiceAccountToken disabled for this pod? "
+                "Set the 'namespace' parameter explicitly in backend_kwargs."
+            )
+
+    @cached_property
+    def client(self) -> CoreV1Api:
+        """Lazy-init Kubernetes CoreV1Api client using in-cluster config 
directly."""
+        load_incluster_config()
+        return CoreV1Api(ApiClient())
+
+    def get_conn_value(self, conn_id: str, team_name: str | None = None) -> 
str | None:
+        """
+        Get serialized representation of Connection from a Kubernetes secret.
+
+        :param conn_id: connection id
+        :param team_name: Team name associated to the task trying to access 
the connection (if any)
+        """
+        if self.connections_label is None:
+            return None
+        return self._get_secret_by_label(self.connections_label, conn_id, 
self.connections_data_key)

Review Comment:
   I am not sure if I want his piece of code to be extracted as it is repeated 
a few times yet I do not see enough logic done here to separate it out, what do 
you think?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py:
##########
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Objects relating to sourcing connections, variables, and configs from 
Kubernetes Secrets."""
+
+from __future__ import annotations
+
+import base64
+from functools import cached_property
+from pathlib import Path
+
+from kubernetes.client import ApiClient, CoreV1Api
+from kubernetes.config import load_incluster_config
+
+from airflow.exceptions import AirflowException
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin):
+    """
+    Retrieve connections, variables, and configs from Kubernetes Secrets using 
labels.
+
+    This backend discovers secrets by querying Kubernetes labels, enabling 
integration
+    with External Secrets Operator (ESO), Sealed Secrets, or any tool that 
creates
+    Kubernetes secrets — regardless of the secret's name.
+
+    Configurable via ``airflow.cfg``:
+
+    .. code-block:: ini
+
+        [secrets]
+        backend = 
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+        backend_kwargs = {"namespace": "airflow", "connections_label": 
"airflow.apache.org/connection-name"}
+
+    The secret must have a label whose key matches the configured label and 
whose value
+    matches the requested identifier (conn_id, variable key, or config key). 
The actual
+    secret value is read from the ``value`` key in the secret's data.
+
+    Example Kubernetes secret for a connection named ``my_db``:
+
+    .. code-block:: yaml
+
+        apiVersion: v1
+        kind: Secret
+        metadata:
+          name: anything
+          labels:
+            airflow.apache.org/connection-name: my_db
+        data:
+          value: <base64-encoded-connection-uri>
+
+    **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` 
directly
+    for in-cluster authentication. Does not use KubernetesHook or any Airflow 
connection,
+    avoiding circular dependencies since this IS the secrets backend.
+    The namespace can be set explicitly via ``backend_kwargs``. If not set, it 
is
+    auto-detected from the pod's service account metadata at
+    ``/var/run/secrets/kubernetes.io/serviceaccount/namespace``. If 
auto-detection
+    fails (e.g. automountServiceAccountToken is disabled), an error is raised.
+
+    **Performance:** Queries use ``resource_version="0"`` so the Kubernetes 
API server
+    serves results from its in-memory watch cache, making lookups very fast 
without
+    requiring Airflow-side caching.
+
+    :param namespace: Kubernetes namespace to query for secrets. If not set, 
the
+        namespace is auto-detected from the pod's service account metadata. If
+        auto-detection fails, an ``AirflowException`` is raised.
+    :param connections_label: Label key used to discover connection secrets.
+        If set to None, requests for connections will not be sent to 
Kubernetes.
+    :param variables_label: Label key used to discover variable secrets.
+        If set to None, requests for variables will not be sent to Kubernetes.
+    :param config_label: Label key used to discover config secrets.
+        If set to None, requests for configurations will not be sent to 
Kubernetes.
+    :param connections_data_key: The data key in the Kubernetes secret that 
holds the
+        connection value. Default: ``"value"``
+    :param variables_data_key: The data key in the Kubernetes secret that 
holds the
+        variable value. Default: ``"value"``
+    :param config_data_key: The data key in the Kubernetes secret that holds 
the
+        config value. Default: ``"value"``
+    """
+
+    def __init__(
+        self,
+        namespace: str | None = None,
+        connections_label: str | None = "airflow.apache.org/connection-name",
+        variables_label: str | None = "airflow.apache.org/variable-name",
+        config_label: str | None = "airflow.apache.org/config-name",
+        connections_data_key: str = "value",
+        variables_data_key: str = "value",
+        config_data_key: str = "value",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self._namespace = namespace
+        self.connections_label = connections_label
+        self.variables_label = variables_label
+        self.config_label = config_label
+        self.connections_data_key = connections_data_key
+        self.variables_data_key = variables_data_key
+        self.config_data_key = config_data_key
+
+    @cached_property
+    def namespace(self) -> str:
+        """Return the configured namespace, or auto-detect from service 
account metadata."""
+        if self._namespace:
+            return self._namespace
+        try:
+            return 
Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read_text().strip()
+        except FileNotFoundError:
+            raise AirflowException(
+                "Could not auto-detect Kubernetes namespace from "
+                "/var/run/secrets/kubernetes.io/serviceaccount/namespace. "
+                "Is automountServiceAccountToken disabled for this pod? "
+                "Set the 'namespace' parameter explicitly in backend_kwargs."
+            )
+
+    @cached_property
+    def client(self) -> CoreV1Api:
+        """Lazy-init Kubernetes CoreV1Api client using in-cluster config 
directly."""
+        load_incluster_config()
+        return CoreV1Api(ApiClient())
+
+    def get_conn_value(self, conn_id: str, team_name: str | None = None) -> 
str | None:
+        """
+        Get serialized representation of Connection from a Kubernetes secret.
+
+        :param conn_id: connection id
+        :param team_name: Team name associated to the task trying to access 
the connection (if any)
+        """
+        if self.connections_label is None:
+            return None
+        return self._get_secret_by_label(self.connections_label, conn_id, 
self.connections_data_key)
+
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
+        """
+        Get Airflow Variable from a Kubernetes secret.
+
+        :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
+        :return: Variable Value
+        """
+        if self.variables_label is None:
+            return None
+        return self._get_secret_by_label(self.variables_label, key, 
self.variables_data_key)
+
+    def get_config(self, key: str) -> str | None:
+        """
+        Get Airflow Configuration from a Kubernetes secret.
+
+        :param key: Configuration Option Key
+        :return: Configuration Option Value
+        """
+        if self.config_label is None:
+            return None
+        return self._get_secret_by_label(self.config_label, key, 
self.config_data_key)

Review Comment:
   The more you know, thank you!



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py:
##########
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Objects relating to sourcing connections, variables, and configs from 
Kubernetes Secrets."""
+
+from __future__ import annotations
+
+import base64
+from functools import cached_property
+from pathlib import Path
+
+from kubernetes.client import ApiClient, CoreV1Api
+from kubernetes.config import load_incluster_config
+
+from airflow.exceptions import AirflowException
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin):
+    """
+    Retrieve connections, variables, and configs from Kubernetes Secrets using 
labels.
+
+    This backend discovers secrets by querying Kubernetes labels, enabling 
integration
+    with External Secrets Operator (ESO), Sealed Secrets, or any tool that 
creates
+    Kubernetes secrets — regardless of the secret's name.
+
+    Configurable via ``airflow.cfg``:
+
+    .. code-block:: ini
+
+        [secrets]
+        backend = 
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+        backend_kwargs = {"namespace": "airflow", "connections_label": 
"airflow.apache.org/connection-name"}
+
+    The secret must have a label whose key matches the configured label and 
whose value
+    matches the requested identifier (conn_id, variable key, or config key). 
The actual
+    secret value is read from the ``value`` key in the secret's data.
+
+    Example Kubernetes secret for a connection named ``my_db``:
+
+    .. code-block:: yaml
+
+        apiVersion: v1
+        kind: Secret
+        metadata:
+          name: anything
+          labels:
+            airflow.apache.org/connection-name: my_db
+        data:
+          value: <base64-encoded-connection-uri>
+
+    **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` 
directly
+    for in-cluster authentication. Does not use KubernetesHook or any Airflow 
connection,
+    avoiding circular dependencies since this IS the secrets backend.
+    The namespace can be set explicitly via ``backend_kwargs``. If not set, it 
is
+    auto-detected from the pod's service account metadata at
+    ``/var/run/secrets/kubernetes.io/serviceaccount/namespace``. If 
auto-detection
+    fails (e.g. automountServiceAccountToken is disabled), an error is raised.
+
+    **Performance:** Queries use ``resource_version="0"`` so the Kubernetes 
API server
+    serves results from its in-memory watch cache, making lookups very fast 
without
+    requiring Airflow-side caching.
+
+    :param namespace: Kubernetes namespace to query for secrets. If not set, 
the
+        namespace is auto-detected from the pod's service account metadata. If
+        auto-detection fails, an ``AirflowException`` is raised.
+    :param connections_label: Label key used to discover connection secrets.
+        If set to None, requests for connections will not be sent to 
Kubernetes.
+    :param variables_label: Label key used to discover variable secrets.
+        If set to None, requests for variables will not be sent to Kubernetes.
+    :param config_label: Label key used to discover config secrets.
+        If set to None, requests for configurations will not be sent to 
Kubernetes.
+    :param connections_data_key: The data key in the Kubernetes secret that 
holds the
+        connection value. Default: ``"value"``
+    :param variables_data_key: The data key in the Kubernetes secret that 
holds the
+        variable value. Default: ``"value"``
+    :param config_data_key: The data key in the Kubernetes secret that holds 
the
+        config value. Default: ``"value"``
+    """
+
+    def __init__(
+        self,
+        namespace: str | None = None,
+        connections_label: str | None = "airflow.apache.org/connection-name",
+        variables_label: str | None = "airflow.apache.org/variable-name",
+        config_label: str | None = "airflow.apache.org/config-name",
+        connections_data_key: str = "value",
+        variables_data_key: str = "value",
+        config_data_key: str = "value",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self._namespace = namespace
+        self.connections_label = connections_label
+        self.variables_label = variables_label
+        self.config_label = config_label
+        self.connections_data_key = connections_data_key
+        self.variables_data_key = variables_data_key
+        self.config_data_key = config_data_key
+
+    @cached_property
+    def namespace(self) -> str:
+        """Return the configured namespace, or auto-detect from service 
account metadata."""
+        if self._namespace:
+            return self._namespace
+        try:
+            return 
Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read_text().strip()
+        except FileNotFoundError:
+            raise AirflowException(
+                "Could not auto-detect Kubernetes namespace from "
+                "/var/run/secrets/kubernetes.io/serviceaccount/namespace. "
+                "Is automountServiceAccountToken disabled for this pod? "
+                "Set the 'namespace' parameter explicitly in backend_kwargs."
+            )
+
+    @cached_property
+    def client(self) -> CoreV1Api:
+        """Lazy-init Kubernetes CoreV1Api client using in-cluster config 
directly."""
+        load_incluster_config()
+        return CoreV1Api(ApiClient())
+
+    def get_conn_value(self, conn_id: str, team_name: str | None = None) -> 
str | None:
+        """
+        Get serialized representation of Connection from a Kubernetes secret.
+
+        :param conn_id: connection id
+        :param team_name: Team name associated to the task trying to access 
the connection (if any)
+        """
+        if self.connections_label is None:
+            return None
+        return self._get_secret_by_label(self.connections_label, conn_id, 
self.connections_data_key)
+
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
+        """
+        Get Airflow Variable from a Kubernetes secret.
+
+        :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
+        :return: Variable Value
+        """
+        if self.variables_label is None:
+            return None
+        return self._get_secret_by_label(self.variables_label, key, 
self.variables_data_key)
+
+    def get_config(self, key: str) -> str | None:
+        """
+        Get Airflow Configuration from a Kubernetes secret.
+
+        :param key: Configuration Option Key
+        :return: Configuration Option Value
+        """
+        if self.config_label is None:
+            return None
+        return self._get_secret_by_label(self.config_label, key, 
self.config_data_key)
+
+    def _get_secret_by_label(self, label_key: str, label_value: str, data_key: 
str) -> str | None:

Review Comment:
   Maybe a better name for the method would be just `_get_secret`? As the 
required label key and value make it quite obvious on how we will search the 
secret



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to