o-nikolas commented on code in PR #61798:
URL: https://github.com/apache/airflow/pull/61798#discussion_r2824200609


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py:
##########
@@ -16,71 +16,95 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
 from airflow.settings import AIRFLOW_HOME
 
+if TYPE_CHECKING:
+    from airflow.executors.base_executor import ExecutorConf
+
 
 class KubeConfig:
-    """Configuration for Kubernetes."""
+    """
+    Configuration for Kubernetes.
+
+    :param executor_conf: Optional team-aware configuration object. If not 
provided,
+        falls back to the global configuration for backwards compatibility. 
This parameter
+        supports the multi-team feature introduced in AIP-67.
+    """
 
     core_section = "core"
     kubernetes_section = "kubernetes_executor"
     logging_section = "logging"
 
-    def __init__(self):
+    def __init__(self, executor_conf: ExecutorConf | None = None):

Review Comment:
   There are a couple usages of this class that need corresponding updates to 
pass in the team_config I think.
   
   This [one 
here](https://github.com/apache/airflow/blob/05c3386ef4fe15d6b11bd9edeb49c63bb21b38b1/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py#L37)
 has access to the TI, you can use that to fetch the team name and construct a 
team aware ExecutorConfig and pass that in here (or optionally this class can 
just accept the team name and construct the config itself).
   
   This [one 
here](https://github.com/apache/airflow/blob/05c3386ef4fe15d6b11bd9edeb49c63bb21b38b1/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py#L75)
 is maybe a little on the fence. It probably doesn't need to be team-aware as 
of now, but if someone uses it thinking that it is then it could cause some 
test issues. Curious to hear your thoughts on that one.



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py:
##########
@@ -1932,3 +1932,163 @@ def 
test_kube_config_parse_worker_pod_pending_fatal_container_state_reasons(
             executor = KubernetesExecutor()
 
         assert 
executor.kube_config.worker_pod_pending_fatal_container_state_reasons == 
expected_result
+
+
+class TestKubernetesExecutorMultiTeam:
+    """Tests for AIP-67 multi-team support in KubernetesExecutor."""
+
+    def test_supports_multi_team(self):
+        """Test that KubernetesExecutor declares multi-team support."""
+        assert KubernetesExecutor.supports_multi_team is True
+
+    def test_global_executor_without_team_name(self):
+        """Test that global executor (no team) works correctly with backwards 
compatibility."""
+        executor = KubernetesExecutor()
+
+        # Verify executor has conf
+        assert hasattr(executor, "conf")
+        # On older Airflow versions, conf is the global AirflowConfigParser 
(no team_name attr).
+        # On newer versions, conf is an ExecutorConf with team_name=None.
+        assert getattr(executor.conf, "team_name", None) is None
+
+        # Verify KubeConfig was created with the executor's conf
+        assert executor.kube_config is not None
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires 
Airflow 3.2+")
+    def test_executor_with_team_name(self):
+        """Test that executor created with a team_name has team-specific 
conf."""
+        executor = KubernetesExecutor(team_name="ml_team")
+
+        assert executor.conf.team_name == "ml_team"
+        assert executor.team_name == "ml_team"
+        assert executor.kube_config is not None
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires 
Airflow 3.2+")
+    def test_multiple_team_executors_isolation(self, monkeypatch):
+        """Test that multiple team executors can coexist with isolated 
resources."""
+        # Set team-specific parallelism via env vars so KubeConfig picks them 
up
+        # parallelism is read from the 'core' section
+        monkeypatch.setenv("AIRFLOW__TEAM_A___CORE__PARALLELISM", "2")
+        monkeypatch.setenv("AIRFLOW__TEAM_B___CORE__PARALLELISM", "3")

Review Comment:
   Can you update this to use a configuration that is specific to the 
Kubernetes executor please. As of now, users should not modify 
`core.parallelism` per team. That config should be consistent, the scheduler 
reads that config and assumes it will be consistent across executors and 
scheduling logic depends on it. I'll think about putting in some logic to 
restrict teams from setting this config. 



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py:
##########
@@ -16,71 +16,95 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
 from airflow.settings import AIRFLOW_HOME
 
+if TYPE_CHECKING:
+    from airflow.executors.base_executor import ExecutorConf
+
 
 class KubeConfig:
-    """Configuration for Kubernetes."""
+    """
+    Configuration for Kubernetes.
+
+    :param executor_conf: Optional team-aware configuration object. If not 
provided,
+        falls back to the global configuration for backwards compatibility. 
This parameter
+        supports the multi-team feature introduced in AIP-67.
+    """
 
     core_section = "core"
     kubernetes_section = "kubernetes_executor"
     logging_section = "logging"
 
-    def __init__(self):
+    def __init__(self, executor_conf: ExecutorConf | None = None):
+        # Use the provided executor_conf for team-aware configuration, or fall 
back to global conf
+        # for backwards compatibility with older versions of Airflow.
+        self._conf = executor_conf if executor_conf is not None else conf
+
         configuration_dict = conf.as_dict(display_sensitive=True)
         self.core_configuration = configuration_dict[self.core_section]
         self.airflow_home = AIRFLOW_HOME
-        self.dags_folder = conf.get(self.core_section, "dags_folder")
-        self.parallelism = conf.getint(self.core_section, "parallelism")
-        self.pod_template_file = conf.get(self.kubernetes_section, 
"pod_template_file", fallback=None)
+        self.dags_folder = self._conf.get(self.core_section, "dags_folder")
+        self.parallelism = self._conf.getint(self.core_section, "parallelism")
+        self.pod_template_file = self._conf.get(self.kubernetes_section, 
"pod_template_file", fallback=None)
 
-        self.delete_worker_pods = conf.getboolean(self.kubernetes_section, 
"delete_worker_pods")
-        self.delete_worker_pods_on_failure = conf.getboolean(
+        self.delete_worker_pods = 
self._conf.getboolean(self.kubernetes_section, "delete_worker_pods")
+        self.delete_worker_pods_on_failure = self._conf.getboolean(
             self.kubernetes_section, "delete_worker_pods_on_failure"
         )
-        self.worker_pod_pending_fatal_container_state_reasons = []
-        if conf.get(self.kubernetes_section, 
"worker_pod_pending_fatal_container_state_reasons", fallback=""):
+        self.worker_pod_pending_fatal_container_state_reasons: list[str] = []
+        fatal_reasons = self._conf.get(
+            self.kubernetes_section, 
"worker_pod_pending_fatal_container_state_reasons", fallback=""
+        )
+        if fatal_reasons:

Review Comment:
   Nice optimization, the code was strange before with the double config read...



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -79,14 +79,29 @@ class KubernetesExecutor(BaseExecutor):
 
     RUNNING_POD_LOG_LINES = 100
     supports_ad_hoc_ti_run: bool = True
+    supports_multi_team: bool = True
 
     if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
         # In the v3 path, we store workloads, not commands as strings.
         # TODO: TaskSDK: move this type change into BaseExecutor
         queued_tasks: dict[TaskInstanceKey, workloads.All]  # type: 
ignore[assignment]
 
-    def __init__(self):
-        self.kube_config = KubeConfig()
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+        # Check if self has the ExecutorConf set on the self.conf attribute 
with all required methods.
+        # In older Airflow versions, ExecutorConf exists but lacks methods 
like getint, getboolean, etc.
+        # In such cases, fall back to the global configuration object.
+        # This allows the changes to be backwards compatible with older 
versions of Airflow.
+        # Can be removed when minimum supported provider version is equal to 
the version of core airflow
+        # which introduces multi-team configuration (3.2+).
+        if not hasattr(self, "conf") or not hasattr(self.conf, "getint"):
+            self.conf = conf
+
+        self.kube_config = KubeConfig(executor_conf=self.conf)

Review Comment:
   It's nice that kube_config is used in most places, it simplifies the changes 
in this class 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to