This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 99f320354b Refactor: consolidate import time in providers (#34402) 99f320354b is described below commit 99f320354b075fb780e54057d223d2d16ddf08b8 Author: Miroslav Šedivý <6774676+eum...@users.noreply.github.com> AuthorDate: Wed Oct 4 14:11:56 2023 +0000 Refactor: consolidate import time in providers (#34402) --- .../providers/alibaba/cloud/operators/analyticdb_spark.py | 4 ++-- airflow/providers/amazon/aws/hooks/batch_client.py | 4 ++-- .../amazon/aws/hooks/elasticache_replication_group.py | 6 +++--- airflow/providers/amazon/aws/hooks/emr.py | 4 ++-- airflow/providers/amazon/aws/hooks/redshift_data.py | 4 ++-- airflow/providers/amazon/aws/hooks/s3.py | 4 ++-- airflow/providers/amazon/aws/operators/appflow.py | 4 ++-- airflow/providers/apache/livy/operators/livy.py | 4 ++-- airflow/providers/elasticsearch/log/es_task_handler.py | 4 ++-- airflow/providers/google/cloud/hooks/cloud_batch.py | 4 ++-- airflow/providers/google/cloud/hooks/datafusion.py | 12 ++++++------ airflow/providers/google/cloud/operators/datafusion.py | 4 ++-- airflow/providers/google/cloud/operators/dataplex.py | 6 +++--- .../providers/google/cloud/operators/dataproc_metastore.py | 6 +++--- .../microsoft/azure/operators/container_instances.py | 4 ++-- tests/providers/amazon/aws/hooks/test_batch_client.py | 8 ++++---- .../azure/operators/test_azure_container_instances.py | 2 +- tests/system/providers/amazon/aws/utils/__init__.py | 4 ++-- 18 files changed, 44 insertions(+), 44 deletions(-) diff --git a/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py b/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py index 79834dbe65..106f7583cb 100644 --- a/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py +++ b/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py @@ -17,8 +17,8 @@ # under the License. from __future__ import annotations +import time from functools import cached_property -from time import sleep from typing import TYPE_CHECKING, Any, Sequence from deprecated.classic import deprecated @@ -78,7 +78,7 @@ class AnalyticDBSparkBaseOperator(BaseOperator): state = self.hook.get_spark_state(app_id) while AppState(state) not in AnalyticDBSparkHook.TERMINAL_STATES: self.log.debug("Application with id %s is in state: %s", app_id, state) - sleep(self.polling_interval) + time.sleep(self.polling_interval) state = self.hook.get_spark_state(app_id) self.log.info("Application with id %s terminated with state: %s", app_id, state) self.log.info( diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index a7c973bf1e..af9d79c1ae 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -28,7 +28,7 @@ from __future__ import annotations import itertools import random -from time import sleep +import time from typing import TYPE_CHECKING, Callable import botocore.client @@ -549,7 +549,7 @@ class BatchClientHook(AwsBaseHook): delay = random.uniform(BatchClientHook.DEFAULT_DELAY_MIN, BatchClientHook.DEFAULT_DELAY_MAX) else: delay = BatchClientHook.add_jitter(delay) - sleep(delay) + time.sleep(delay) @staticmethod def exponential_delay(tries: int) -> float: diff --git a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py index 5dd227d7ee..d37f0babba 100644 --- a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py +++ b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from time import sleep +import time from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook @@ -160,7 +160,7 @@ class ElastiCacheReplicationGroupHook(AwsBaseHook): self.log.info("Poke retry %s. Sleep time %s seconds. Sleeping...", num_tries, sleep_time) - sleep(sleep_time) + time.sleep(sleep_time) sleep_time *= exponential_back_off_factor @@ -240,7 +240,7 @@ class ElastiCacheReplicationGroupHook(AwsBaseHook): self.log.info("Poke retry %s. Sleep time %s seconds. Sleeping...", num_tries, sleep_time) - sleep(sleep_time) + time.sleep(sleep_time) sleep_time *= exponential_back_off_factor diff --git a/airflow/providers/amazon/aws/hooks/emr.py b/airflow/providers/amazon/aws/hooks/emr.py index 1ff2129277..17f85cc8a5 100644 --- a/airflow/providers/amazon/aws/hooks/emr.py +++ b/airflow/providers/amazon/aws/hooks/emr.py @@ -18,8 +18,8 @@ from __future__ import annotations import json +import time import warnings -from time import sleep from typing import Any from botocore.exceptions import ClientError @@ -509,7 +509,7 @@ class EmrContainerHook(AwsBaseHook): final_query_state = query_state break try_number += 1 - sleep(poll_interval) + time.sleep(poll_interval) return final_query_state def stop_query(self, job_id: str) -> dict: diff --git a/airflow/providers/amazon/aws/hooks/redshift_data.py b/airflow/providers/amazon/aws/hooks/redshift_data.py index 110ef9dad1..f7df0fd744 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_data.py +++ b/airflow/providers/amazon/aws/hooks/redshift_data.py @@ -17,8 +17,8 @@ # under the License. from __future__ import annotations +import time from pprint import pformat -from time import sleep from typing import TYPE_CHECKING, Any, Iterable from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook @@ -127,7 +127,7 @@ class RedshiftDataHook(AwsGenericHook["RedshiftDataAPIServiceClient"]): ) else: self.log.info("Query %s", status) - sleep(poll_interval) + time.sleep(poll_interval) def get_table_primary_key( self, diff --git a/airflow/providers/amazon/aws/hooks/s3.py b/airflow/providers/amazon/aws/hooks/s3.py index 9eeeaf876b..f36c671c95 100644 --- a/airflow/providers/amazon/aws/hooks/s3.py +++ b/airflow/providers/amazon/aws/hooks/s3.py @@ -26,6 +26,7 @@ import logging import os import re import shutil +import time import warnings from contextlib import suppress from copy import deepcopy @@ -35,7 +36,6 @@ from inspect import signature from io import BytesIO from pathlib import Path from tempfile import NamedTemporaryFile, gettempdir -from time import sleep from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast from urllib.parse import urlsplit from uuid import uuid4 @@ -1289,7 +1289,7 @@ class S3Hook(AwsBaseHook): if not bucket_keys: break if retry: # Avoid first loop - sleep(500) + time.sleep(500) self.delete_objects(bucket=bucket_name, keys=bucket_keys) diff --git a/airflow/providers/amazon/aws/operators/appflow.py b/airflow/providers/amazon/aws/operators/appflow.py index 7b7402acde..184fc7fab1 100644 --- a/airflow/providers/amazon/aws/operators/appflow.py +++ b/airflow/providers/amazon/aws/operators/appflow.py @@ -16,10 +16,10 @@ # under the License. from __future__ import annotations +import time import warnings from datetime import datetime, timedelta from functools import cached_property -from time import sleep from typing import TYPE_CHECKING, cast from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning @@ -107,7 +107,7 @@ class AppflowBaseOperator(BaseOperator): self._update_flow() # while schedule flows will pick up the update right away, on-demand flows might use out of date # info if triggered right after an update, so we need to wait a bit for the DB to be consistent. - sleep(AppflowBaseOperator.UPDATE_PROPAGATION_TIME) + time.sleep(AppflowBaseOperator.UPDATE_PROPAGATION_TIME) self._run_flow(context) diff --git a/airflow/providers/apache/livy/operators/livy.py b/airflow/providers/apache/livy/operators/livy.py index bcf36b50ca..8f4fa75527 100644 --- a/airflow/providers/apache/livy/operators/livy.py +++ b/airflow/providers/apache/livy/operators/livy.py @@ -17,8 +17,8 @@ """This module contains the Apache Livy operator.""" from __future__ import annotations +import time from functools import cached_property -from time import sleep from typing import TYPE_CHECKING, Any, Sequence from deprecated.classic import deprecated @@ -189,7 +189,7 @@ class LivyOperator(BaseOperator): state = self.hook.get_batch_state(batch_id, retry_args=self.retry_args) while state not in self.hook.TERMINAL_STATES: self.log.debug("Batch with id %s is in state: %s", batch_id, state.value) - sleep(self._polling_interval) + time.sleep(self._polling_interval) state = self.hook.get_batch_state(batch_id, retry_args=self.retry_args) self.log.info("Batch with id %s terminated with state: %s", batch_id, state.value) self.hook.dump_batch_logs(batch_id) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index e227ca85ca..070d81543f 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -21,10 +21,10 @@ import contextlib import inspect import logging import sys +import time import warnings from collections import defaultdict from operator import attrgetter -from time import time from typing import TYPE_CHECKING, Any, Callable, List, Tuple from urllib.parse import quote, urlparse @@ -372,7 +372,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix def emit(self, record): if self.handler: - setattr(record, self.offset_field, int(time() * (10**9))) + setattr(record, self.offset_field, int(time.time() * (10**9))) self.handler.emit(record) def set_context(self, ti: TaskInstance) -> None: diff --git a/airflow/providers/google/cloud/hooks/cloud_batch.py b/airflow/providers/google/cloud/hooks/cloud_batch.py index ef642e4ec9..5ef56e4427 100644 --- a/airflow/providers/google/cloud/hooks/cloud_batch.py +++ b/airflow/providers/google/cloud/hooks/cloud_batch.py @@ -19,7 +19,7 @@ from __future__ import annotations import itertools import json -from time import sleep +import time from typing import TYPE_CHECKING, Iterable, Sequence from google.cloud.batch import ListJobsRequest, ListTasksRequest @@ -152,7 +152,7 @@ class CloudBatchHook(GoogleBaseHook): ): return job else: - sleep(polling_period_seconds) + time.sleep(polling_period_seconds) except Exception as e: self.log.exception("Exception occurred while checking for job completion.") raise e diff --git a/airflow/providers/google/cloud/hooks/datafusion.py b/airflow/providers/google/cloud/hooks/datafusion.py index b3cfd6ee9b..3c8d1b7453 100644 --- a/airflow/providers/google/cloud/hooks/datafusion.py +++ b/airflow/providers/google/cloud/hooks/datafusion.py @@ -20,7 +20,7 @@ from __future__ import annotations import asyncio import json import os -from time import monotonic, sleep +import time from typing import Any, Dict, Sequence from urllib.parse import quote, urlencode, urljoin @@ -91,7 +91,7 @@ class DataFusionHook(GoogleBaseHook): def wait_for_operation(self, operation: dict[str, Any]) -> dict[str, Any]: """Waits for long-lasting operation to complete.""" for time_to_wait in exponential_sleep_generator(initial=10, maximum=120): - sleep(time_to_wait) + time.sleep(time_to_wait) operation = ( self.get_conn().projects().locations().operations().get(name=operation.get("name")).execute() ) @@ -115,9 +115,9 @@ class DataFusionHook(GoogleBaseHook): """Polls pipeline state and raises an exception if the state fails or times out.""" failure_states = failure_states or FAILURE_STATES success_states = success_states or SUCCESS_STATES - start_time = monotonic() + start_time = time.monotonic() current_state = None - while monotonic() - start_time < timeout: + while time.monotonic() - start_time < timeout: try: workflow = self.get_pipeline_workflow( pipeline_name=pipeline_name, @@ -135,7 +135,7 @@ class DataFusionHook(GoogleBaseHook): raise AirflowException( f"Pipeline {pipeline_name} state {current_state} is not one of {success_states}" ) - sleep(30) + time.sleep(30) # Time is up! raise AirflowException( @@ -393,7 +393,7 @@ class DataFusionHook(GoogleBaseHook): ) except ConflictException as exc: self.log.info(exc) - sleep(time_to_wait) + time.sleep(time_to_wait) else: if response.status == 200: break diff --git a/airflow/providers/google/cloud/operators/datafusion.py b/airflow/providers/google/cloud/operators/datafusion.py index e5d0dec5df..b2149495f9 100644 --- a/airflow/providers/google/cloud/operators/datafusion.py +++ b/airflow/providers/google/cloud/operators/datafusion.py @@ -17,7 +17,7 @@ """This module contains Google DataFusion operators.""" from __future__ import annotations -from time import sleep +import time from typing import TYPE_CHECKING, Any, Sequence from google.api_core.retry import exponential_sleep_generator @@ -267,7 +267,7 @@ class CloudDataFusionCreateInstanceOperator(GoogleCloudBaseOperator): for time_to_wait in exponential_sleep_generator(initial=10, maximum=120): if instance["state"] != "CREATING": break - sleep(time_to_wait) + time.sleep(time_to_wait) instance = hook.get_instance( instance_name=self.instance_name, location=self.location, project_id=self.project_id ) diff --git a/airflow/providers/google/cloud/operators/dataplex.py b/airflow/providers/google/cloud/operators/dataplex.py index 3a85961d2d..4fdd9ab970 100644 --- a/airflow/providers/google/cloud/operators/dataplex.py +++ b/airflow/providers/google/cloud/operators/dataplex.py @@ -18,7 +18,7 @@ from __future__ import annotations -from time import sleep +import time from typing import TYPE_CHECKING, Any, Sequence from airflow.exceptions import AirflowException @@ -165,7 +165,7 @@ class DataplexCreateTaskOperator(GoogleCloudBaseOperator): ) if task["state"] != "CREATING": break - sleep(time_to_wait) + time.sleep(time_to_wait) return Task.to_dict(task) @@ -534,7 +534,7 @@ class DataplexCreateLakeOperator(GoogleCloudBaseOperator): ) if lake["state"] != "CREATING": break - sleep(time_to_wait) + time.sleep(time_to_wait) DataplexLakeLink.persist( context=context, task_instance=self, diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py b/airflow/providers/google/cloud/operators/dataproc_metastore.py index c27c5dc314..af88728fb6 100644 --- a/airflow/providers/google/cloud/operators/dataproc_metastore.py +++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py @@ -18,7 +18,7 @@ """This module contains Google Dataproc Metastore operators.""" from __future__ import annotations -from time import sleep +import time from typing import TYPE_CHECKING, Sequence from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault @@ -700,7 +700,7 @@ class DataprocMetastoreExportMetadataOperator(GoogleCloudBaseOperator): the SDK. """ for time_to_wait in exponential_sleep_generator(initial=10, maximum=120): - sleep(time_to_wait) + time.sleep(time_to_wait) service = hook.get_service( region=self.region, project_id=self.project_id, @@ -986,7 +986,7 @@ class DataprocMetastoreRestoreServiceOperator(GoogleCloudBaseOperator): the SDK. """ for time_to_wait in exponential_sleep_generator(initial=10, maximum=120): - sleep(time_to_wait) + time.sleep(time_to_wait) service = hook.get_service( region=self.region, project_id=self.project_id, diff --git a/airflow/providers/microsoft/azure/operators/container_instances.py b/airflow/providers/microsoft/azure/operators/container_instances.py index 73c082003e..c104995aff 100644 --- a/airflow/providers/microsoft/azure/operators/container_instances.py +++ b/airflow/providers/microsoft/azure/operators/container_instances.py @@ -18,8 +18,8 @@ from __future__ import annotations import re +import time from collections import namedtuple -from time import sleep from typing import TYPE_CHECKING, Any, Sequence from azure.mgmt.containerinstance.models import ( @@ -348,7 +348,7 @@ class AzureContainerInstancesOperator(BaseOperator): except Exception: self.log.exception("Exception while getting container groups") - sleep(1) + time.sleep(1) def _log_last(self, logs: list | None, last_line_logged: Any) -> Any | None: if logs: diff --git a/tests/providers/amazon/aws/hooks/test_batch_client.py b/tests/providers/amazon/aws/hooks/test_batch_client.py index e501da3c06..39dde3f141 100644 --- a/tests/providers/amazon/aws/hooks/test_batch_client.py +++ b/tests/providers/amazon/aws/hooks/test_batch_client.py @@ -427,7 +427,7 @@ class TestBatchClientDelays: assert result <= width @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform") - @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep") + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep") def test_delay_defaults(self, mock_sleep, mock_uniform): assert BatchClientHook.DEFAULT_DELAY_MIN == 1 assert BatchClientHook.DEFAULT_DELAY_MAX == 10 @@ -439,21 +439,21 @@ class TestBatchClientDelays: mock_sleep.assert_called_once_with(0) @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform") - @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep") + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep") def test_delay_with_zero(self, mock_sleep, mock_uniform): self.batch_client.delay(0) mock_uniform.assert_called_once_with(0, 1) # in add_jitter mock_sleep.assert_called_once_with(mock_uniform.return_value) @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform") - @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep") + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep") def test_delay_with_int(self, mock_sleep, mock_uniform): self.batch_client.delay(5) mock_uniform.assert_called_once_with(4, 6) # in add_jitter mock_sleep.assert_called_once_with(mock_uniform.return_value) @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform") - @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep") + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep") def test_delay_with_float(self, mock_sleep, mock_uniform): self.batch_client.delay(5.0) mock_uniform.assert_called_once_with(4.0, 6.0) # in add_jitter diff --git a/tests/providers/microsoft/azure/operators/test_azure_container_instances.py b/tests/providers/microsoft/azure/operators/test_azure_container_instances.py index 2984fe60ee..35fdc2b8b9 100644 --- a/tests/providers/microsoft/azure/operators/test_azure_container_instances.py +++ b/tests/providers/microsoft/azure/operators/test_azure_container_instances.py @@ -345,7 +345,7 @@ class TestACIOperator: ) @mock.patch("airflow.providers.microsoft.azure.operators.container_instances.AzureContainerInstanceHook") - @mock.patch("airflow.providers.microsoft.azure.operators.container_instances.sleep") + @mock.patch("airflow.providers.microsoft.azure.operators.container_instances.time.sleep") def test_execute_correct_sleep_cycle(self, sleep_mock, aci_mock): expected_cg1 = make_mock_container(state="Running", exit_code=0, detail_status="test") expected_cg2 = make_mock_container(state="Terminated", exit_code=0, detail_status="test") diff --git a/tests/system/providers/amazon/aws/utils/__init__.py b/tests/system/providers/amazon/aws/utils/__init__.py index 6515fe6552..b521e3dedd 100644 --- a/tests/system/providers/amazon/aws/utils/__init__.py +++ b/tests/system/providers/amazon/aws/utils/__init__.py @@ -20,8 +20,8 @@ import inspect import json import logging import os +import time from pathlib import Path -from time import sleep from typing import TYPE_CHECKING from uuid import uuid4 @@ -331,7 +331,7 @@ def _purge_logs( if not retry or retry_times == 0 or e.response["Error"]["Code"] != "ResourceNotFoundException": raise e - sleep(PURGE_LOGS_INTERVAL_PERIOD) + time.sleep(PURGE_LOGS_INTERVAL_PERIOD) _purge_logs( test_logs=test_logs, force_delete=force_delete,