This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 0776bdbad1 Resolve google cloud kubernetes operator and trigger deprecations in tests (#40072) 0776bdbad1 is described below commit 0776bdbad15cf5fa0bd318d3faf889146b72f412 Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Thu Jun 6 17:46:58 2024 +0530 Resolve google cloud kubernetes operator and trigger deprecations in tests (#40072) --- tests/deprecations_ignore.yml | 18 ----- .../cloud/operators/test_kubernetes_engine.py | 78 +++++++++++++++++----- .../cloud/triggers/test_kubernetes_engine.py | 2 +- 3 files changed, 64 insertions(+), 34 deletions(-) diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index 02712ddb59..57dbb7a605 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -242,16 +242,6 @@ - tests/providers/google/cloud/operators/test_dataproc.py::test_scale_cluster_operator_extra_links - tests/providers/google/cloud/operators/test_dataproc.py::test_submit_spark_job_operator_extra_links - tests/providers/google/cloud/operators/test_gcs.py::TestGoogleCloudStorageListOperator::test_execute__delimiter -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_cluster_info -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_config_file_throws_error -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_default_gcp_conn_id -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_gcp_conn_id -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_on_finish_action_handler -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_template_fields -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperatorAsync::test_async_create_pod_should_execute_successfully -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute -- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_call_defer_method - tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_error_body - tests/providers/google/cloud/operators/test_life_sciences.py::TestLifeSciencesRunPipelineOperator::test_executes - tests/providers/google/cloud/operators/test_life_sciences.py::TestLifeSciencesRunPipelineOperator::test_executes_without_project_id @@ -346,14 +336,6 @@ - tests/providers/google/cloud/transfers/test_gcs_to_gcs.py::TestGoogleCloudStorageToCloudStorageOperator::test_wc_with_last_modified_time_with_all_true_cond - tests/providers/google/cloud/transfers/test_gcs_to_gcs.py::TestGoogleCloudStorageToCloudStorageOperator::test_wc_with_last_modified_time_with_one_true_cond - tests/providers/google/cloud/transfers/test_gcs_to_gcs.py::TestGoogleCloudStorageToCloudStorageOperator::test_wc_with_no_last_modified_time -- tests/providers/google/cloud/triggers/test_kubernetes_engine.py::TestGKEStartPodTrigger::test_define_container_state_should_execute_successfully -- tests/providers/google/cloud/triggers/test_kubernetes_engine.py::TestGKEStartPodTrigger::test_logging_in_trigger_when_exception_should_execute_successfully -- tests/providers/google/cloud/triggers/test_kubernetes_engine.py::TestGKEStartPodTrigger::test_logging_in_trigger_when_fail_should_execute_successfully -- tests/providers/google/cloud/triggers/test_kubernetes_engine.py::TestGKEStartPodTrigger::test_run_loop_return_failed_event_should_execute_successfully -- tests/providers/google/cloud/triggers/test_kubernetes_engine.py::TestGKEStartPodTrigger::test_run_loop_return_running_event_should_execute_successfully -- tests/providers/google/cloud/triggers/test_kubernetes_engine.py::TestGKEStartPodTrigger::test_run_loop_return_success_event_should_execute_successfully -- tests/providers/google/cloud/triggers/test_kubernetes_engine.py::TestGKEStartPodTrigger::test_run_loop_return_waiting_event_should_execute_successfully -- tests/providers/google/cloud/triggers/test_kubernetes_engine.py::TestGKEStartPodTrigger::test_serialize_should_execute_successfully - tests/providers/google/cloud/utils/test_mlengine_operator_utils.py::TestMlengineOperatorUtils::test_apply_validate_fn - tests/providers/google/cloud/utils/test_mlengine_operator_utils.py::TestMlengineOperatorUtils::test_create_evaluate_ops - tests/providers/google/cloud/utils/test_mlengine_operator_utils.py::TestMlengineOperatorUtils::test_create_evaluate_ops_dag diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py index 2e27db59b3..f8416c2189 100644 --- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py @@ -26,7 +26,7 @@ from google.cloud.container_v1.types import Cluster, NodePool from kubernetes.client.models import V1Deployment, V1DeploymentStatus from kubernetes.utils.create_from_yaml import FailToCreateError -from airflow.exceptions import AirflowException, TaskDeferred +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, TaskDeferred from airflow.models import Connection from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator, KubernetesJobOperator from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator @@ -143,9 +143,21 @@ class TestGoogleCloudPlatformContainerOperator: @mock.patch(GKE_HOOK_PATH) def test_create_execute(self, mock_hook, body): print("type: ", type(body)) - operator = GKECreateClusterOperator( - project_id=TEST_GCP_PROJECT_ID, location=PROJECT_LOCATION, body=body, task_id=PROJECT_TASK_ID - ) + if body == PROJECT_BODY_CREATE_DICT or body == PROJECT_BODY_CREATE_CLUSTER: + with pytest.warns( + AirflowProviderDeprecationWarning, + match="The body field 'initial_node_count' is deprecated. Use 'node_pool.initial_node_count' instead.", + ): + operator = GKECreateClusterOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + body=body, + task_id=PROJECT_TASK_ID, + ) + else: + operator = GKECreateClusterOperator( + project_id=TEST_GCP_PROJECT_ID, location=PROJECT_LOCATION, body=body, task_id=PROJECT_TASK_ID + ) operator.execute(context=mock.MagicMock()) mock_hook.return_value.create_cluster.assert_called_once_with( @@ -224,7 +236,7 @@ class TestGoogleCloudPlatformContainerOperator: operator = GKECreateClusterOperator( project_id=TEST_GCP_PROJECT_ID, location=PROJECT_LOCATION, - body=PROJECT_BODY_CREATE_DICT, + body=PROJECT_BODY_CREATE_DICT_NODE_POOLS, task_id=PROJECT_TASK_ID, deferrable=True, ) @@ -294,6 +306,7 @@ class TestGKEPodOperator: name=TASK_NAME, namespace=NAMESPACE, image=IMAGE, + on_finish_action=OnFinishAction.KEEP_POD, ) self.gke_op.pod = mock.MagicMock( name=TASK_NAME, @@ -322,6 +335,7 @@ class TestGKEPodOperator: namespace=NAMESPACE, image=IMAGE, config_file="/path/to/alternative/kubeconfig", + on_finish_action=OnFinishAction.KEEP_POD, ) @mock.patch.dict(os.environ, {}) @@ -375,6 +389,7 @@ class TestGKEPodOperator: namespace=NAMESPACE, image=IMAGE, use_internal_ip=use_internal_ip, + on_finish_action=OnFinishAction.KEEP_POD, ) cluster_url, ssl_ca_cert = gke_op.fetch_cluster_info() @@ -391,6 +406,7 @@ class TestGKEPodOperator: name=TASK_NAME, namespace=NAMESPACE, image=IMAGE, + on_finish_action=OnFinishAction.KEEP_POD, ) gke_op._cluster_url = CLUSTER_URL gke_op._ssl_ca_cert = SSL_CA_CERT @@ -412,6 +428,7 @@ class TestGKEPodOperator: namespace=NAMESPACE, image=IMAGE, gcp_conn_id="test_conn", + on_finish_action=OnFinishAction.KEEP_POD, ) gke_op._cluster_url = CLUSTER_URL gke_op._ssl_ca_cert = SSL_CA_CERT @@ -466,16 +483,47 @@ class TestGKEPodOperator: kpo_init_args_mock = mock.MagicMock(**{"parameters": ["on_finish_action"] if compatible_kpo else []}) with mock.patch("inspect.signature", return_value=kpo_init_args_mock): - op = GKEStartPodOperator( - project_id=TEST_GCP_PROJECT_ID, - location=PROJECT_LOCATION, - cluster_name=CLUSTER_NAME, - task_id=PROJECT_TASK_ID, - name=TASK_NAME, - namespace=NAMESPACE, - image=IMAGE, - **kwargs, - ) + if "is_delete_operator_pod" in kwargs: + with pytest.warns( + AirflowProviderDeprecationWarning, + match="`is_delete_operator_pod` parameter is deprecated, please use `on_finish_action`", + ): + op = GKEStartPodOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + cluster_name=CLUSTER_NAME, + task_id=PROJECT_TASK_ID, + name=TASK_NAME, + namespace=NAMESPACE, + image=IMAGE, + **kwargs, + ) + elif "on_finish_action" not in kwargs: + with pytest.warns( + AirflowProviderDeprecationWarning, + match="You have not set parameter `on_finish_action` in class GKEStartPodOperator. Currently the default for this parameter is `keep_pod` but in a future release the default will be changed to `delete_pod`. To ensure pods are not deleted in the future you will need to set `on_finish_action=keep_pod` explicitly.", + ): + op = GKEStartPodOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + cluster_name=CLUSTER_NAME, + task_id=PROJECT_TASK_ID, + name=TASK_NAME, + namespace=NAMESPACE, + image=IMAGE, + **kwargs, + ) + else: + op = GKEStartPodOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + cluster_name=CLUSTER_NAME, + task_id=PROJECT_TASK_ID, + name=TASK_NAME, + namespace=NAMESPACE, + image=IMAGE, + **kwargs, + ) for expected_attr in expected_attributes: assert op.__getattribute__(expected_attr) == expected_attributes[expected_attr] diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py index 8a43f3627c..88f92e780c 100644 --- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py @@ -73,7 +73,6 @@ def trigger(): poll_interval=POLL_INTERVAL, cluster_context=CLUSTER_CONTEXT, in_cluster=IN_CLUSTER, - should_delete_pod=SHOULD_DELETE_POD, get_logs=GET_LOGS, startup_timeout=STARTUP_TIMEOUT_SECS, trigger_start_time=TRIGGER_START_TIME, @@ -82,6 +81,7 @@ def trigger(): base_container_name=BASE_CONTAINER_NAME, gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, + on_finish_action=ON_FINISH_ACTION, )