o-nikolas commented on code in PR #61798:
URL: https://github.com/apache/airflow/pull/61798#discussion_r2824200609
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py:
##########
@@ -16,71 +16,95 @@
# under the License.
from __future__ import annotations
+from typing import TYPE_CHECKING
+
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.settings import AIRFLOW_HOME
+if TYPE_CHECKING:
+ from airflow.executors.base_executor import ExecutorConf
+
class KubeConfig:
- """Configuration for Kubernetes."""
+ """
+ Configuration for Kubernetes.
+
+ :param executor_conf: Optional team-aware configuration object. If not
provided,
+ falls back to the global configuration for backwards compatibility.
This parameter
+ supports the multi-team feature introduced in AIP-67.
+ """
core_section = "core"
kubernetes_section = "kubernetes_executor"
logging_section = "logging"
- def __init__(self):
+ def __init__(self, executor_conf: ExecutorConf | None = None):
Review Comment:
There are a couple usages of this class that need corresponding updates to
pass in the team_config I think.
This [one
here](https://github.com/apache/airflow/blob/05c3386ef4fe15d6b11bd9edeb49c63bb21b38b1/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py#L37)
has access to the TI, you can use that to fetch the team name and construct a
team aware ExecutorConfig and pass that in here (or optionally this class can
just accept the team name and construct the config itself).
This [one
here](https://github.com/apache/airflow/blob/05c3386ef4fe15d6b11bd9edeb49c63bb21b38b1/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py#L75)
is maybe a little on the fence. It probably doesn't need to be team-aware as
of now, but if someone uses it thinking that it is then it could cause some
test issues. Curious to hear your thoughts on that one.
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py:
##########
@@ -1932,3 +1932,163 @@ def
test_kube_config_parse_worker_pod_pending_fatal_container_state_reasons(
executor = KubernetesExecutor()
assert
executor.kube_config.worker_pod_pending_fatal_container_state_reasons ==
expected_result
+
+
+class TestKubernetesExecutorMultiTeam:
+ """Tests for AIP-67 multi-team support in KubernetesExecutor."""
+
+ def test_supports_multi_team(self):
+ """Test that KubernetesExecutor declares multi-team support."""
+ assert KubernetesExecutor.supports_multi_team is True
+
+ def test_global_executor_without_team_name(self):
+ """Test that global executor (no team) works correctly with backwards
compatibility."""
+ executor = KubernetesExecutor()
+
+ # Verify executor has conf
+ assert hasattr(executor, "conf")
+ # On older Airflow versions, conf is the global AirflowConfigParser
(no team_name attr).
+ # On newer versions, conf is an ExecutorConf with team_name=None.
+ assert getattr(executor.conf, "team_name", None) is None
+
+ # Verify KubeConfig was created with the executor's conf
+ assert executor.kube_config is not None
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires
Airflow 3.2+")
+ def test_executor_with_team_name(self):
+ """Test that executor created with a team_name has team-specific
conf."""
+ executor = KubernetesExecutor(team_name="ml_team")
+
+ assert executor.conf.team_name == "ml_team"
+ assert executor.team_name == "ml_team"
+ assert executor.kube_config is not None
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires
Airflow 3.2+")
+ def test_multiple_team_executors_isolation(self, monkeypatch):
+ """Test that multiple team executors can coexist with isolated
resources."""
+ # Set team-specific parallelism via env vars so KubeConfig picks them
up
+ # parallelism is read from the 'core' section
+ monkeypatch.setenv("AIRFLOW__TEAM_A___CORE__PARALLELISM", "2")
+ monkeypatch.setenv("AIRFLOW__TEAM_B___CORE__PARALLELISM", "3")
Review Comment:
Can you update this to use a configuration that is specific to the
Kubernetes executor please. As of now, users should not modify
`core.parallelism` per team. That config should be consistent, the scheduler
reads that config and assumes it will be consistent across executors and
scheduling logic depends on it. I'll think about putting in some logic to
restrict teams from setting this config.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py:
##########
@@ -16,71 +16,95 @@
# under the License.
from __future__ import annotations
+from typing import TYPE_CHECKING
+
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.settings import AIRFLOW_HOME
+if TYPE_CHECKING:
+ from airflow.executors.base_executor import ExecutorConf
+
class KubeConfig:
- """Configuration for Kubernetes."""
+ """
+ Configuration for Kubernetes.
+
+ :param executor_conf: Optional team-aware configuration object. If not
provided,
+ falls back to the global configuration for backwards compatibility.
This parameter
+ supports the multi-team feature introduced in AIP-67.
+ """
core_section = "core"
kubernetes_section = "kubernetes_executor"
logging_section = "logging"
- def __init__(self):
+ def __init__(self, executor_conf: ExecutorConf | None = None):
+ # Use the provided executor_conf for team-aware configuration, or fall
back to global conf
+ # for backwards compatibility with older versions of Airflow.
+ self._conf = executor_conf if executor_conf is not None else conf
+
configuration_dict = conf.as_dict(display_sensitive=True)
self.core_configuration = configuration_dict[self.core_section]
self.airflow_home = AIRFLOW_HOME
- self.dags_folder = conf.get(self.core_section, "dags_folder")
- self.parallelism = conf.getint(self.core_section, "parallelism")
- self.pod_template_file = conf.get(self.kubernetes_section,
"pod_template_file", fallback=None)
+ self.dags_folder = self._conf.get(self.core_section, "dags_folder")
+ self.parallelism = self._conf.getint(self.core_section, "parallelism")
+ self.pod_template_file = self._conf.get(self.kubernetes_section,
"pod_template_file", fallback=None)
- self.delete_worker_pods = conf.getboolean(self.kubernetes_section,
"delete_worker_pods")
- self.delete_worker_pods_on_failure = conf.getboolean(
+ self.delete_worker_pods =
self._conf.getboolean(self.kubernetes_section, "delete_worker_pods")
+ self.delete_worker_pods_on_failure = self._conf.getboolean(
self.kubernetes_section, "delete_worker_pods_on_failure"
)
- self.worker_pod_pending_fatal_container_state_reasons = []
- if conf.get(self.kubernetes_section,
"worker_pod_pending_fatal_container_state_reasons", fallback=""):
+ self.worker_pod_pending_fatal_container_state_reasons: list[str] = []
+ fatal_reasons = self._conf.get(
+ self.kubernetes_section,
"worker_pod_pending_fatal_container_state_reasons", fallback=""
+ )
+ if fatal_reasons:
Review Comment:
Nice optimization, the code was strange before with the double config read...
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -79,14 +79,29 @@ class KubernetesExecutor(BaseExecutor):
RUNNING_POD_LOG_LINES = 100
supports_ad_hoc_ti_run: bool = True
+ supports_multi_team: bool = True
if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
# In the v3 path, we store workloads, not commands as strings.
# TODO: TaskSDK: move this type change into BaseExecutor
queued_tasks: dict[TaskInstanceKey, workloads.All] # type:
ignore[assignment]
- def __init__(self):
- self.kube_config = KubeConfig()
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ # Check if self has the ExecutorConf set on the self.conf attribute
with all required methods.
+ # In older Airflow versions, ExecutorConf exists but lacks methods
like getint, getboolean, etc.
+ # In such cases, fall back to the global configuration object.
+ # This allows the changes to be backwards compatible with older
versions of Airflow.
+ # Can be removed when minimum supported provider version is equal to
the version of core airflow
+ # which introduces multi-team configuration (3.2+).
+ if not hasattr(self, "conf") or not hasattr(self.conf, "getint"):
+ self.conf = conf
+
+ self.kube_config = KubeConfig(executor_conf=self.conf)
Review Comment:
It's nice that kube_config is used in most places, it simplifies the changes
in this class 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]