This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 99f320354b Refactor: consolidate import time in providers (#34402)
99f320354b is described below

commit 99f320354b075fb780e54057d223d2d16ddf08b8
Author: Miroslav Šedivý <6774676+eum...@users.noreply.github.com>
AuthorDate: Wed Oct 4 14:11:56 2023 +0000

    Refactor: consolidate import time in providers (#34402)
---
 .../providers/alibaba/cloud/operators/analyticdb_spark.py    |  4 ++--
 airflow/providers/amazon/aws/hooks/batch_client.py           |  4 ++--
 .../amazon/aws/hooks/elasticache_replication_group.py        |  6 +++---
 airflow/providers/amazon/aws/hooks/emr.py                    |  4 ++--
 airflow/providers/amazon/aws/hooks/redshift_data.py          |  4 ++--
 airflow/providers/amazon/aws/hooks/s3.py                     |  4 ++--
 airflow/providers/amazon/aws/operators/appflow.py            |  4 ++--
 airflow/providers/apache/livy/operators/livy.py              |  4 ++--
 airflow/providers/elasticsearch/log/es_task_handler.py       |  4 ++--
 airflow/providers/google/cloud/hooks/cloud_batch.py          |  4 ++--
 airflow/providers/google/cloud/hooks/datafusion.py           | 12 ++++++------
 airflow/providers/google/cloud/operators/datafusion.py       |  4 ++--
 airflow/providers/google/cloud/operators/dataplex.py         |  6 +++---
 .../providers/google/cloud/operators/dataproc_metastore.py   |  6 +++---
 .../microsoft/azure/operators/container_instances.py         |  4 ++--
 tests/providers/amazon/aws/hooks/test_batch_client.py        |  8 ++++----
 .../azure/operators/test_azure_container_instances.py        |  2 +-
 tests/system/providers/amazon/aws/utils/__init__.py          |  4 ++--
 18 files changed, 44 insertions(+), 44 deletions(-)

diff --git a/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py 
b/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
index 79834dbe65..106f7583cb 100644
--- a/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
+++ b/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
@@ -17,8 +17,8 @@
 # under the License.
 from __future__ import annotations
 
+import time
 from functools import cached_property
-from time import sleep
 from typing import TYPE_CHECKING, Any, Sequence
 
 from deprecated.classic import deprecated
@@ -78,7 +78,7 @@ class AnalyticDBSparkBaseOperator(BaseOperator):
         state = self.hook.get_spark_state(app_id)
         while AppState(state) not in AnalyticDBSparkHook.TERMINAL_STATES:
             self.log.debug("Application with id %s is in state: %s", app_id, 
state)
-            sleep(self.polling_interval)
+            time.sleep(self.polling_interval)
             state = self.hook.get_spark_state(app_id)
         self.log.info("Application with id %s terminated with state: %s", 
app_id, state)
         self.log.info(
diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py 
b/airflow/providers/amazon/aws/hooks/batch_client.py
index a7c973bf1e..af9d79c1ae 100644
--- a/airflow/providers/amazon/aws/hooks/batch_client.py
+++ b/airflow/providers/amazon/aws/hooks/batch_client.py
@@ -28,7 +28,7 @@ from __future__ import annotations
 
 import itertools
 import random
-from time import sleep
+import time
 from typing import TYPE_CHECKING, Callable
 
 import botocore.client
@@ -549,7 +549,7 @@ class BatchClientHook(AwsBaseHook):
             delay = random.uniform(BatchClientHook.DEFAULT_DELAY_MIN, 
BatchClientHook.DEFAULT_DELAY_MAX)
         else:
             delay = BatchClientHook.add_jitter(delay)
-        sleep(delay)
+        time.sleep(delay)
 
     @staticmethod
     def exponential_delay(tries: int) -> float:
diff --git 
a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py 
b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py
index 5dd227d7ee..d37f0babba 100644
--- a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py
+++ b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py
@@ -17,7 +17,7 @@
 # under the License.
 from __future__ import annotations
 
-from time import sleep
+import time
 
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
@@ -160,7 +160,7 @@ class ElastiCacheReplicationGroupHook(AwsBaseHook):
 
                 self.log.info("Poke retry %s. Sleep time %s seconds. 
Sleeping...", num_tries, sleep_time)
 
-                sleep(sleep_time)
+                time.sleep(sleep_time)
 
                 sleep_time *= exponential_back_off_factor
 
@@ -240,7 +240,7 @@ class ElastiCacheReplicationGroupHook(AwsBaseHook):
 
                 self.log.info("Poke retry %s. Sleep time %s seconds. 
Sleeping...", num_tries, sleep_time)
 
-                sleep(sleep_time)
+                time.sleep(sleep_time)
 
                 sleep_time *= exponential_back_off_factor
 
diff --git a/airflow/providers/amazon/aws/hooks/emr.py 
b/airflow/providers/amazon/aws/hooks/emr.py
index 1ff2129277..17f85cc8a5 100644
--- a/airflow/providers/amazon/aws/hooks/emr.py
+++ b/airflow/providers/amazon/aws/hooks/emr.py
@@ -18,8 +18,8 @@
 from __future__ import annotations
 
 import json
+import time
 import warnings
-from time import sleep
 from typing import Any
 
 from botocore.exceptions import ClientError
@@ -509,7 +509,7 @@ class EmrContainerHook(AwsBaseHook):
                 final_query_state = query_state
                 break
             try_number += 1
-            sleep(poll_interval)
+            time.sleep(poll_interval)
         return final_query_state
 
     def stop_query(self, job_id: str) -> dict:
diff --git a/airflow/providers/amazon/aws/hooks/redshift_data.py 
b/airflow/providers/amazon/aws/hooks/redshift_data.py
index 110ef9dad1..f7df0fd744 100644
--- a/airflow/providers/amazon/aws/hooks/redshift_data.py
+++ b/airflow/providers/amazon/aws/hooks/redshift_data.py
@@ -17,8 +17,8 @@
 # under the License.
 from __future__ import annotations
 
+import time
 from pprint import pformat
-from time import sleep
 from typing import TYPE_CHECKING, Any, Iterable
 
 from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
@@ -127,7 +127,7 @@ class 
RedshiftDataHook(AwsGenericHook["RedshiftDataAPIServiceClient"]):
                 )
             else:
                 self.log.info("Query %s", status)
-            sleep(poll_interval)
+            time.sleep(poll_interval)
 
     def get_table_primary_key(
         self,
diff --git a/airflow/providers/amazon/aws/hooks/s3.py 
b/airflow/providers/amazon/aws/hooks/s3.py
index 9eeeaf876b..f36c671c95 100644
--- a/airflow/providers/amazon/aws/hooks/s3.py
+++ b/airflow/providers/amazon/aws/hooks/s3.py
@@ -26,6 +26,7 @@ import logging
 import os
 import re
 import shutil
+import time
 import warnings
 from contextlib import suppress
 from copy import deepcopy
@@ -35,7 +36,6 @@ from inspect import signature
 from io import BytesIO
 from pathlib import Path
 from tempfile import NamedTemporaryFile, gettempdir
-from time import sleep
 from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
 from urllib.parse import urlsplit
 from uuid import uuid4
@@ -1289,7 +1289,7 @@ class S3Hook(AwsBaseHook):
                 if not bucket_keys:
                     break
                 if retry:  # Avoid first loop
-                    sleep(500)
+                    time.sleep(500)
 
                 self.delete_objects(bucket=bucket_name, keys=bucket_keys)
 
diff --git a/airflow/providers/amazon/aws/operators/appflow.py 
b/airflow/providers/amazon/aws/operators/appflow.py
index 7b7402acde..184fc7fab1 100644
--- a/airflow/providers/amazon/aws/operators/appflow.py
+++ b/airflow/providers/amazon/aws/operators/appflow.py
@@ -16,10 +16,10 @@
 # under the License.
 from __future__ import annotations
 
+import time
 import warnings
 from datetime import datetime, timedelta
 from functools import cached_property
-from time import sleep
 from typing import TYPE_CHECKING, cast
 
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
@@ -107,7 +107,7 @@ class AppflowBaseOperator(BaseOperator):
             self._update_flow()
             # while schedule flows will pick up the update right away, 
on-demand flows might use out of date
             # info if triggered right after an update, so we need to wait a 
bit for the DB to be consistent.
-            sleep(AppflowBaseOperator.UPDATE_PROPAGATION_TIME)
+            time.sleep(AppflowBaseOperator.UPDATE_PROPAGATION_TIME)
 
         self._run_flow(context)
 
diff --git a/airflow/providers/apache/livy/operators/livy.py 
b/airflow/providers/apache/livy/operators/livy.py
index bcf36b50ca..8f4fa75527 100644
--- a/airflow/providers/apache/livy/operators/livy.py
+++ b/airflow/providers/apache/livy/operators/livy.py
@@ -17,8 +17,8 @@
 """This module contains the Apache Livy operator."""
 from __future__ import annotations
 
+import time
 from functools import cached_property
-from time import sleep
 from typing import TYPE_CHECKING, Any, Sequence
 
 from deprecated.classic import deprecated
@@ -189,7 +189,7 @@ class LivyOperator(BaseOperator):
         state = self.hook.get_batch_state(batch_id, retry_args=self.retry_args)
         while state not in self.hook.TERMINAL_STATES:
             self.log.debug("Batch with id %s is in state: %s", batch_id, 
state.value)
-            sleep(self._polling_interval)
+            time.sleep(self._polling_interval)
             state = self.hook.get_batch_state(batch_id, 
retry_args=self.retry_args)
         self.log.info("Batch with id %s terminated with state: %s", batch_id, 
state.value)
         self.hook.dump_batch_logs(batch_id)
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py 
b/airflow/providers/elasticsearch/log/es_task_handler.py
index e227ca85ca..070d81543f 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -21,10 +21,10 @@ import contextlib
 import inspect
 import logging
 import sys
+import time
 import warnings
 from collections import defaultdict
 from operator import attrgetter
-from time import time
 from typing import TYPE_CHECKING, Any, Callable, List, Tuple
 from urllib.parse import quote, urlparse
 
@@ -372,7 +372,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
 
     def emit(self, record):
         if self.handler:
-            setattr(record, self.offset_field, int(time() * (10**9)))
+            setattr(record, self.offset_field, int(time.time() * (10**9)))
             self.handler.emit(record)
 
     def set_context(self, ti: TaskInstance) -> None:
diff --git a/airflow/providers/google/cloud/hooks/cloud_batch.py 
b/airflow/providers/google/cloud/hooks/cloud_batch.py
index ef642e4ec9..5ef56e4427 100644
--- a/airflow/providers/google/cloud/hooks/cloud_batch.py
+++ b/airflow/providers/google/cloud/hooks/cloud_batch.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 import itertools
 import json
-from time import sleep
+import time
 from typing import TYPE_CHECKING, Iterable, Sequence
 
 from google.cloud.batch import ListJobsRequest, ListTasksRequest
@@ -152,7 +152,7 @@ class CloudBatchHook(GoogleBaseHook):
                 ):
                     return job
                 else:
-                    sleep(polling_period_seconds)
+                    time.sleep(polling_period_seconds)
             except Exception as e:
                 self.log.exception("Exception occurred while checking for job 
completion.")
                 raise e
diff --git a/airflow/providers/google/cloud/hooks/datafusion.py 
b/airflow/providers/google/cloud/hooks/datafusion.py
index b3cfd6ee9b..3c8d1b7453 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 import asyncio
 import json
 import os
-from time import monotonic, sleep
+import time
 from typing import Any, Dict, Sequence
 from urllib.parse import quote, urlencode, urljoin
 
@@ -91,7 +91,7 @@ class DataFusionHook(GoogleBaseHook):
     def wait_for_operation(self, operation: dict[str, Any]) -> dict[str, Any]:
         """Waits for long-lasting operation to complete."""
         for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):
-            sleep(time_to_wait)
+            time.sleep(time_to_wait)
             operation = (
                 
self.get_conn().projects().locations().operations().get(name=operation.get("name")).execute()
             )
@@ -115,9 +115,9 @@ class DataFusionHook(GoogleBaseHook):
         """Polls pipeline state and raises an exception if the state fails or 
times out."""
         failure_states = failure_states or FAILURE_STATES
         success_states = success_states or SUCCESS_STATES
-        start_time = monotonic()
+        start_time = time.monotonic()
         current_state = None
-        while monotonic() - start_time < timeout:
+        while time.monotonic() - start_time < timeout:
             try:
                 workflow = self.get_pipeline_workflow(
                     pipeline_name=pipeline_name,
@@ -135,7 +135,7 @@ class DataFusionHook(GoogleBaseHook):
                 raise AirflowException(
                     f"Pipeline {pipeline_name} state {current_state} is not 
one of {success_states}"
                 )
-            sleep(30)
+            time.sleep(30)
 
         # Time is up!
         raise AirflowException(
@@ -393,7 +393,7 @@ class DataFusionHook(GoogleBaseHook):
                 )
             except ConflictException as exc:
                 self.log.info(exc)
-                sleep(time_to_wait)
+                time.sleep(time_to_wait)
             else:
                 if response.status == 200:
                     break
diff --git a/airflow/providers/google/cloud/operators/datafusion.py 
b/airflow/providers/google/cloud/operators/datafusion.py
index e5d0dec5df..b2149495f9 100644
--- a/airflow/providers/google/cloud/operators/datafusion.py
+++ b/airflow/providers/google/cloud/operators/datafusion.py
@@ -17,7 +17,7 @@
 """This module contains Google DataFusion operators."""
 from __future__ import annotations
 
-from time import sleep
+import time
 from typing import TYPE_CHECKING, Any, Sequence
 
 from google.api_core.retry import exponential_sleep_generator
@@ -267,7 +267,7 @@ class 
CloudDataFusionCreateInstanceOperator(GoogleCloudBaseOperator):
             for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):
                 if instance["state"] != "CREATING":
                     break
-                sleep(time_to_wait)
+                time.sleep(time_to_wait)
                 instance = hook.get_instance(
                     instance_name=self.instance_name, location=self.location, 
project_id=self.project_id
                 )
diff --git a/airflow/providers/google/cloud/operators/dataplex.py 
b/airflow/providers/google/cloud/operators/dataplex.py
index 3a85961d2d..4fdd9ab970 100644
--- a/airflow/providers/google/cloud/operators/dataplex.py
+++ b/airflow/providers/google/cloud/operators/dataplex.py
@@ -18,7 +18,7 @@
 
 from __future__ import annotations
 
-from time import sleep
+import time
 from typing import TYPE_CHECKING, Any, Sequence
 
 from airflow.exceptions import AirflowException
@@ -165,7 +165,7 @@ class DataplexCreateTaskOperator(GoogleCloudBaseOperator):
                 )
                 if task["state"] != "CREATING":
                     break
-                sleep(time_to_wait)
+                time.sleep(time_to_wait)
 
         return Task.to_dict(task)
 
@@ -534,7 +534,7 @@ class DataplexCreateLakeOperator(GoogleCloudBaseOperator):
                 )
                 if lake["state"] != "CREATING":
                     break
-                sleep(time_to_wait)
+                time.sleep(time_to_wait)
         DataplexLakeLink.persist(
             context=context,
             task_instance=self,
diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py 
b/airflow/providers/google/cloud/operators/dataproc_metastore.py
index c27c5dc314..af88728fb6 100644
--- a/airflow/providers/google/cloud/operators/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py
@@ -18,7 +18,7 @@
 """This module contains Google Dataproc Metastore operators."""
 from __future__ import annotations
 
-from time import sleep
+import time
 from typing import TYPE_CHECKING, Sequence
 
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
@@ -700,7 +700,7 @@ class 
DataprocMetastoreExportMetadataOperator(GoogleCloudBaseOperator):
         the SDK.
         """
         for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):
-            sleep(time_to_wait)
+            time.sleep(time_to_wait)
             service = hook.get_service(
                 region=self.region,
                 project_id=self.project_id,
@@ -986,7 +986,7 @@ class 
DataprocMetastoreRestoreServiceOperator(GoogleCloudBaseOperator):
         the SDK.
         """
         for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):
-            sleep(time_to_wait)
+            time.sleep(time_to_wait)
             service = hook.get_service(
                 region=self.region,
                 project_id=self.project_id,
diff --git a/airflow/providers/microsoft/azure/operators/container_instances.py 
b/airflow/providers/microsoft/azure/operators/container_instances.py
index 73c082003e..c104995aff 100644
--- a/airflow/providers/microsoft/azure/operators/container_instances.py
+++ b/airflow/providers/microsoft/azure/operators/container_instances.py
@@ -18,8 +18,8 @@
 from __future__ import annotations
 
 import re
+import time
 from collections import namedtuple
-from time import sleep
 from typing import TYPE_CHECKING, Any, Sequence
 
 from azure.mgmt.containerinstance.models import (
@@ -348,7 +348,7 @@ class AzureContainerInstancesOperator(BaseOperator):
             except Exception:
                 self.log.exception("Exception while getting container groups")
 
-            sleep(1)
+            time.sleep(1)
 
     def _log_last(self, logs: list | None, last_line_logged: Any) -> Any | 
None:
         if logs:
diff --git a/tests/providers/amazon/aws/hooks/test_batch_client.py 
b/tests/providers/amazon/aws/hooks/test_batch_client.py
index e501da3c06..39dde3f141 100644
--- a/tests/providers/amazon/aws/hooks/test_batch_client.py
+++ b/tests/providers/amazon/aws/hooks/test_batch_client.py
@@ -427,7 +427,7 @@ class TestBatchClientDelays:
         assert result <= width
 
     
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
-    @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
+    @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
     def test_delay_defaults(self, mock_sleep, mock_uniform):
         assert BatchClientHook.DEFAULT_DELAY_MIN == 1
         assert BatchClientHook.DEFAULT_DELAY_MAX == 10
@@ -439,21 +439,21 @@ class TestBatchClientDelays:
         mock_sleep.assert_called_once_with(0)
 
     
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
-    @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
+    @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
     def test_delay_with_zero(self, mock_sleep, mock_uniform):
         self.batch_client.delay(0)
         mock_uniform.assert_called_once_with(0, 1)  # in add_jitter
         mock_sleep.assert_called_once_with(mock_uniform.return_value)
 
     
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
-    @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
+    @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
     def test_delay_with_int(self, mock_sleep, mock_uniform):
         self.batch_client.delay(5)
         mock_uniform.assert_called_once_with(4, 6)  # in add_jitter
         mock_sleep.assert_called_once_with(mock_uniform.return_value)
 
     
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
-    @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
+    @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
     def test_delay_with_float(self, mock_sleep, mock_uniform):
         self.batch_client.delay(5.0)
         mock_uniform.assert_called_once_with(4.0, 6.0)  # in add_jitter
diff --git 
a/tests/providers/microsoft/azure/operators/test_azure_container_instances.py 
b/tests/providers/microsoft/azure/operators/test_azure_container_instances.py
index 2984fe60ee..35fdc2b8b9 100644
--- 
a/tests/providers/microsoft/azure/operators/test_azure_container_instances.py
+++ 
b/tests/providers/microsoft/azure/operators/test_azure_container_instances.py
@@ -345,7 +345,7 @@ class TestACIOperator:
         )
 
     
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.AzureContainerInstanceHook")
-    
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.sleep")
+    
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.time.sleep")
     def test_execute_correct_sleep_cycle(self, sleep_mock, aci_mock):
         expected_cg1 = make_mock_container(state="Running", exit_code=0, 
detail_status="test")
         expected_cg2 = make_mock_container(state="Terminated", exit_code=0, 
detail_status="test")
diff --git a/tests/system/providers/amazon/aws/utils/__init__.py 
b/tests/system/providers/amazon/aws/utils/__init__.py
index 6515fe6552..b521e3dedd 100644
--- a/tests/system/providers/amazon/aws/utils/__init__.py
+++ b/tests/system/providers/amazon/aws/utils/__init__.py
@@ -20,8 +20,8 @@ import inspect
 import json
 import logging
 import os
+import time
 from pathlib import Path
-from time import sleep
 from typing import TYPE_CHECKING
 from uuid import uuid4
 
@@ -331,7 +331,7 @@ def _purge_logs(
             if not retry or retry_times == 0 or e.response["Error"]["Code"] != 
"ResourceNotFoundException":
                 raise e
 
-            sleep(PURGE_LOGS_INTERVAL_PERIOD)
+            time.sleep(PURGE_LOGS_INTERVAL_PERIOD)
             _purge_logs(
                 test_logs=test_logs,
                 force_delete=force_delete,

Reply via email to