Re: [PR] New Kubernetes Kueue Operators [airflow]
potiuk merged PR #44568: URL: https://github.com/apache/airflow/pull/44568 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
potiuk commented on PR #44568: URL: https://github.com/apache/airflow/pull/44568#issuecomment-2535957319 Cool. The good things that all deprecations have been removed already in the previous wave - so we will not have another bump in k8s provider MAJOR version, so this one is good to go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
moiseenkov commented on PR #44568: URL: https://github.com/apache/airflow/pull/44568#issuecomment-2535376231 Hi @potiuk , The CI is finally green. Thank you very much for your guidance! The PR is ready for review and merging hopefully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
potiuk commented on PR #44568: URL: https://github.com/apache/airflow/pull/44568#issuecomment-2523459474 Tests failing - breeze (easier to fix) and compatibility tests with airflow 2.8 - 2.9 (a bit more difficult). The way how to run compatibilty test is described in detail in https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#compatibility-provider-unit-tests-against-older-airflow-releases - with some examples how to deal with the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
moiseenkov commented on code in PR #44568: URL: https://github.com/apache/airflow/pull/44568#discussion_r1873157303 ## providers/src/airflow/providers/google/provider.yaml: ## @@ -188,7 +188,7 @@ additional-extras: - apache-beam[gcp] - name: cncf.kubernetes dependencies: - - apache-airflow-providers-cncf-kubernetes>=7.2.0 + - apache-airflow-providers-cncf-kubernetes>10.0.1 Review Comment: Awesome! Bumped the version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
potiuk commented on code in PR #44568: URL: https://github.com/apache/airflow/pull/44568#discussion_r1873100162 ## providers/src/airflow/providers/google/provider.yaml: ## @@ -188,7 +188,7 @@ additional-extras: - apache-beam[gcp] - name: cncf.kubernetes dependencies: - - apache-airflow-providers-cncf-kubernetes>=7.2.0 + - apache-airflow-providers-cncf-kubernetes>10.0.1 Review Comment: Actually this should be 10.1.0 (and cncf.kuberbetes should also be bumped to 10.1.0 in their provider.yaml). We are effectively working on a "future" version of the cncf.kubernetes provider, which adds new features, it's version will be 10.1.0 - and we should already do it now as part of this change. This the only exception when provider version should be modified by someone else than release manager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
potiuk commented on code in PR #44568: URL: https://github.com/apache/airflow/pull/44568#discussion_r1873100162 ## providers/src/airflow/providers/google/provider.yaml: ## @@ -188,7 +188,7 @@ additional-extras: - apache-beam[gcp] - name: cncf.kubernetes dependencies: - - apache-airflow-providers-cncf-kubernetes>=7.2.0 + - apache-airflow-providers-cncf-kubernetes>10.0.1 Review Comment: Actually this should be 10.1.0 (and cncf.kuberbetes should also be bumped to 10.1.0). We are effectively working on a "future" version of the cncf.kubernetes provider, which adds new features, it's version will be 10.1.0 - and we should already do it now as part of this change. This the only exception when provider version should be modified by someone else than release manager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
moiseenkov commented on PR #44568: URL: https://github.com/apache/airflow/pull/44568#issuecomment-2522918768 > This is one option yes. There is another option. You can add the `cncf.kubernetes` provider to the list of "chicken-egg" providers in `src/airflow_breeze/global_constants.py` -> similarly to what I've done in #44714 (about to be merged). Thank you for the suggestion! I followed this idea. > Why not just using AirflowOptionalProviderFeatureException? I've also considered this, but unfortunately provided changes aren't optional. For example the `GKEStartKueueInsideClusterOperator` now inherits from `KubernetesInstallKueueOperator` which was introduced in this PR. At the same time the `GKEStartKueueInsideClusterOperator` is a bit older and will be broken if the Kubernetes provider isn't upgraded. Thus I had to bump the cncf-kubernetes provider version in google provider's dependencies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
eladkal commented on PR #44568: URL: https://github.com/apache/airflow/pull/44568#issuecomment-2522777142 > > @eladkal , @potiuk, > > This PR contains changes for both Google and cncf-kubernetes providers, and the Google provider depends on changes in cncf-kuberneets. For this reason I had to bump cncf-kubernetes version in Google provider's [dependencies](https://github.com/apache/airflow/pull/44568/files#diff-bfccba1ff8b4b3e462131ef3d0869d7760d64a2f3b6f1a15b89157a4c7d68404L191) and it now breaks CI because it can't install cncf-kubernetes provider that wasn't released yet. Therefore, should I split this PR, so we could release these changes gradually (cncf first, and google later) or we are OK with merging it all together now. > > This is one option yes. There is another option. You can add the `cncf.kubernetes` provider to the list of "chicken-egg" providers in `src/airflow_breeze/global_constants.py` -> similarly to what I've done in #44714 (about to be merged). Why not just using AirflowOptionalProviderFeatureException? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
potiuk commented on PR #44568: URL: https://github.com/apache/airflow/pull/44568#issuecomment-2522694789 Also updated documentation about it here https://github.com/apache/airflow/pull/44720 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
potiuk commented on PR #44568: URL: https://github.com/apache/airflow/pull/44568#issuecomment-2522646768 > @eladkal , @potiuk, > > This PR contains changes for both Google and cncf-kubernetes providers, and the Google provider depends on changes in cncf-kuberneets. For this reason I had to bump cncf-kubernetes version in Google provider's [dependencies](https://github.com/apache/airflow/pull/44568/files#diff-bfccba1ff8b4b3e462131ef3d0869d7760d64a2f3b6f1a15b89157a4c7d68404L191) and it now breaks CI because it can't install cncf-kubernetes provider that wasn't released yet. Therefore, should I split this PR, so we could release these changes gradually (cncf first, and google later) or we are OK with merging it all together now. This is one option yes. There is another option. You can add the `cncf.kubernetes` provider to the list of "chicken-egg" providers in `src/airflow_breeze/global_constants.py` -> similarly to what I've done in https://github.com/apache/airflow/pull/44714 (about to be merged). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
moiseenkov commented on PR #44568: URL: https://github.com/apache/airflow/pull/44568#issuecomment-2517821458 @eladkal , @potiuk, This PR contains changes for both Google and cncf-kubernetes providers, and the Google provider depends on changes in cncf-kuberneets. For this reason I had to bump cncf-kubernetes version in Google provider's [dependencies](https://github.com/apache/airflow/pull/44568/files#diff-bfccba1ff8b4b3e462131ef3d0869d7760d64a2f3b6f1a15b89157a4c7d68404L191) and it now breaks CI because it can't install cncf-kubernetes provider that wasn't released yet. Therefore, should I split this PR, so we could release these changes gradually (cncf first, and google later) or we are OK with merging it all together now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
moiseenkov commented on code in PR #44568: URL: https://github.com/apache/airflow/pull/44568#discussion_r1869808011 ## providers/tests/deprecations_ignore.yml: ## @@ -94,7 +94,6 @@ - providers/tests/google/cloud/operators/test_dataproc.py::test_scale_cluster_operator_extra_links - providers/tests/google/cloud/operators/test_dataproc.py::test_submit_spark_job_operator_extra_links - providers/tests/google/cloud/operators/test_gcs.py::TestGoogleCloudStorageListOperator::test_execute__delimiter -- providers/tests/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_error_body Review Comment: This unit test was replaced by the new test during GKE operators refactoring. And because it's no longer present, we don't have to ignore it anymore. ## providers/src/airflow/providers/cncf/kubernetes/operators/kueue.py: ## @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Manage a Kubernetes Kueue.""" + +from __future__ import annotations + +import json +import warnings +from collections.abc import Sequence +from functools import cached_property + +from kubernetes.utils import FailToCreateError + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook +from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator + + +class KubernetesInstallKueueOperator(BaseOperator): Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
moiseenkov commented on code in PR #44568: URL: https://github.com/apache/airflow/pull/44568#discussion_r1869807546 ## providers/src/airflow/providers/cncf/kubernetes/operators/kueue.py: ## @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Manage a Kubernetes Kueue.""" + +from __future__ import annotations + +import json +import warnings +from collections.abc import Sequence +from functools import cached_property + +from kubernetes.utils import FailToCreateError + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook +from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator + + +class KubernetesInstallKueueOperator(BaseOperator): Review Comment: This unit test was replaced by the new test during GKE operators refactoring. And because it's no longer present, we don't have to ignore it anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
moiseenkov commented on code in PR #44568: URL: https://github.com/apache/airflow/pull/44568#discussion_r1869805951 ## providers/src/airflow/providers/cncf/kubernetes/operators/kueue.py: ## @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Manage a Kubernetes Kueue.""" + +from __future__ import annotations + +import json +import warnings +from collections.abc import Sequence +from functools import cached_property + +from kubernetes.utils import FailToCreateError + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook +from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator + + +class KubernetesInstallKueueOperator(BaseOperator): +""" +Installs a Kubernetes Kueue. + +:param kueue_version: The Kubernetes Kueue version to install. +:param kubernetes_conn_id: The :ref:`kubernetes connection id ` +for the Kubernetes cluster. +""" + +template_fields: Sequence[str] = ( +"kueue_version", +"kubernetes_conn_id", +) + +def __init__(self, kueue_version: str, kubernetes_conn_id: str = "kubernetes_default", *args, **kwargs): +super().__init__(*args, **kwargs) +self.kubernetes_conn_id = kubernetes_conn_id +self.kueue_version = kueue_version +self._kueue_yaml_url = ( + f"https://github.com/kubernetes-sigs/kueue/releases/download/{self.kueue_version}/manifests.yaml"; +) + +@cached_property +def hook(self) -> KubernetesHook: +return KubernetesHook(conn_id=self.kubernetes_conn_id) + +def execute(self, context): +yaml_objects = self.hook.get_yaml_content_from_file(kueue_yaml_url=self._kueue_yaml_url) +try: +self.hook.apply_from_yaml_file(yaml_objects=yaml_objects) +except FailToCreateError as ex: +error_bodies = [json.loads(e.body) for e in ex.api_exceptions] +if next((e for e in error_bodies if e.get("reason") == "AlreadyExists"), None): +self.log.info("Kueue is already enabled for the cluster") + +if errors := [e for e in error_bodies if e.get("reason") != "AlreadyExists"]: +error_message = "\n".join(e.get("body") for e in errors) +raise AirflowException(error_message) +return + + self.hook.check_kueue_deployment_running(name="kueue-controller-manager", namespace="kueue-system") +self.log.info("Kueue installed successfully!") + + +class KubernetesStartKueueJobOperator(KubernetesJobOperator): +""" +Executes a Kubernetes Job in Kueue. + +:param queue_name: The name of the Queue in the cluster +""" + +template_fields = tuple({"queue_name"} | set(KubernetesJobOperator.template_fields)) + +def __init__(self, queue_name: str, *args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.queue_name = queue_name + +if self.suspend is False: +raise AirflowException( +"The `suspend` parameter can't be False. If you want to use Kueue for running Job" +" in a Kubernetes cluster, set the `suspend` parameter to True.", +) +elif self.suspend is None: +warnings.warn( +f"You have not set parameter `suspend` in class {self.__class__.__name__}. " +"For running a Job in Kueue the `suspend` parameter should set to True.", +UserWarning, +stacklevel=2, +) +self.suspend = True Review Comment: Good point! I replaced the warning by informing here, so users wouldn't be surprised that the `suspend` parameter was forcibly changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Kubernetes Kueue Operators [airflow]
eladkal commented on code in PR #44568: URL: https://github.com/apache/airflow/pull/44568#discussion_r1867249959 ## providers/tests/deprecations_ignore.yml: ## @@ -94,7 +94,6 @@ - providers/tests/google/cloud/operators/test_dataproc.py::test_scale_cluster_operator_extra_links - providers/tests/google/cloud/operators/test_dataproc.py::test_submit_spark_job_operator_extra_links - providers/tests/google/cloud/operators/test_gcs.py::TestGoogleCloudStorageListOperator::test_execute__delimiter -- providers/tests/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_error_body Review Comment: How is this change related to the PR? ## providers/src/airflow/providers/cncf/kubernetes/operators/kueue.py: ## @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Manage a Kubernetes Kueue.""" + +from __future__ import annotations + +import json +import warnings +from collections.abc import Sequence +from functools import cached_property + +from kubernetes.utils import FailToCreateError + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook +from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator + + +class KubernetesInstallKueueOperator(BaseOperator): +""" +Installs a Kubernetes Kueue. + +:param kueue_version: The Kubernetes Kueue version to install. +:param kubernetes_conn_id: The :ref:`kubernetes connection id ` +for the Kubernetes cluster. +""" + +template_fields: Sequence[str] = ( +"kueue_version", +"kubernetes_conn_id", +) + +def __init__(self, kueue_version: str, kubernetes_conn_id: str = "kubernetes_default", *args, **kwargs): +super().__init__(*args, **kwargs) +self.kubernetes_conn_id = kubernetes_conn_id +self.kueue_version = kueue_version +self._kueue_yaml_url = ( + f"https://github.com/kubernetes-sigs/kueue/releases/download/{self.kueue_version}/manifests.yaml"; +) + +@cached_property +def hook(self) -> KubernetesHook: +return KubernetesHook(conn_id=self.kubernetes_conn_id) + +def execute(self, context): +yaml_objects = self.hook.get_yaml_content_from_file(kueue_yaml_url=self._kueue_yaml_url) +try: +self.hook.apply_from_yaml_file(yaml_objects=yaml_objects) +except FailToCreateError as ex: +error_bodies = [json.loads(e.body) for e in ex.api_exceptions] +if next((e for e in error_bodies if e.get("reason") == "AlreadyExists"), None): +self.log.info("Kueue is already enabled for the cluster") + +if errors := [e for e in error_bodies if e.get("reason") != "AlreadyExists"]: +error_message = "\n".join(e.get("body") for e in errors) +raise AirflowException(error_message) +return + + self.hook.check_kueue_deployment_running(name="kueue-controller-manager", namespace="kueue-system") +self.log.info("Kueue installed successfully!") + + +class KubernetesStartKueueJobOperator(KubernetesJobOperator): +""" +Executes a Kubernetes Job in Kueue. + +:param queue_name: The name of the Queue in the cluster +""" + +template_fields = tuple({"queue_name"} | set(KubernetesJobOperator.template_fields)) + +def __init__(self, queue_name: str, *args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.queue_name = queue_name + +if self.suspend is False: +raise AirflowException( +"The `suspend` parameter can't be False. If you want to use Kueue for running Job" +" in a Kubernetes cluster, set the `suspend` parameter to True.", +) +elif self.suspend is None: +warnings.warn( +f"You have not set parameter `suspend` in class {self.__class__.__name__}. " +"For running a Job in Kueue the `suspend` parameter should set to True.", +UserWarning, +stacklevel=2, +)