Repository: incubator-airflow
Updated Branches:
  refs/heads/master 9b7525f25 -> fcd51f362


[AIRFLOW-2716] Replace async and await py3.7 keywords

Closes #3578 from JacobHayes/py37-keywords


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fcd51f36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fcd51f36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fcd51f36

Branch: refs/heads/master
Commit: fcd51f362c6804cb69dee8e2dd054304a0564dde
Parents: 9b7525f
Author: Jacob Hayes <jacob.r.ha...@gmail.com>
Authored: Sun Jul 29 11:56:41 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Sun Jul 29 11:56:41 2018 +0200

----------------------------------------------------------------------
 UPDATING.md                                       |  4 ++++
 airflow/contrib/hooks/gcp_dataproc_hook.py        | 12 +++++++++++-
 airflow/contrib/operators/dataproc_operator.py    |  2 +-
 airflow/executors/celery_executor.py              | 12 ++++++------
 tests/contrib/operators/test_dataproc_operator.py |  8 ++++----
 5 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fcd51f36/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 2ca421d..da80f56 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -5,6 +5,10 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Replace DataProcHook.await calls to DataProcHook.wait
+
+The method name was changed to be compatible with the Python 3.7 async/await 
keywords
+
 ### DAG level Access Control for new RBAC UI
 
 Extend and enhance new Airflow RBAC UI to support DAG level ACL. Each dag now 
has two permissions(one for write, one for read) associated('can_dag_edit', 
'can_dag_read').

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fcd51f36/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py 
b/airflow/contrib/hooks/gcp_dataproc_hook.py
index fc15137..8e4f32b 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -21,6 +21,7 @@ import time
 import uuid
 
 from apiclient.discovery import build
+from zope.deprecation import deprecation
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -224,7 +225,16 @@ class DataProcHook(GoogleCloudBaseHook):
         return _DataProcJobBuilder(self.project_id, task_id, cluster_name,
                                    job_type, properties)
 
-    def await(self, operation):
+    def wait(self, operation):
         """Awaits for Google Cloud Dataproc Operation to complete."""
         submitted = _DataProcOperation(self.get_conn(), operation)
         submitted.wait_for_done()
+
+
+setattr(
+    DataProcHook,
+    "await",
+    deprecation.deprecated(
+        DataProcHook.wait, "renamed to 'wait' for Python3.7 compatability"
+    ),
+)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fcd51f36/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py 
b/airflow/contrib/operators/dataproc_operator.py
index 23cfeb0..01d137f 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -1265,7 +1265,7 @@ class DataprocWorkflowTemplateBaseOperator(BaseOperator):
         )
 
     def execute(self, context):
-        self.hook.await(self.start())
+        self.hook.wait(self.start())
 
     def start(self, context):
         raise AirflowException('plese start a workflow operation')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fcd51f36/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index 6cfd2d3..481daa5 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -89,9 +89,9 @@ class CeleryExecutor(BaseExecutor):
 
     def sync(self):
         self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
-        for key, async in list(self.tasks.items()):
+        for key, task in list(self.tasks.items()):
             try:
-                state = async.state
+                state = task.state
                 if self.last_state[key] != state:
                     if state == celery_states.SUCCESS:
                         self.success(key)
@@ -106,8 +106,8 @@ class CeleryExecutor(BaseExecutor):
                         del self.tasks[key]
                         del self.last_state[key]
                     else:
-                        self.log.info("Unexpected state: %s", async.state)
-                    self.last_state[key] = async.state
+                        self.log.info("Unexpected state: %s", task.state)
+                    self.last_state[key] = task.state
             except Exception as e:
                 self.log.error("Error syncing the celery executor, ignoring 
it:")
                 self.log.exception(e)
@@ -115,7 +115,7 @@ class CeleryExecutor(BaseExecutor):
     def end(self, synchronous=False):
         if synchronous:
             while any([
-                    async.state not in celery_states.READY_STATES
-                    for async in self.tasks.values()]):
+                    task.state not in celery_states.READY_STATES
+                    for task in self.tasks.values()]):
                 time.sleep(5)
         self.sync()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fcd51f36/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py 
b/tests/contrib/operators/test_dataproc_operator.py
index f584ba7..038cf61 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -569,7 +569,7 @@ class 
DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase):
         with patch(HOOK) as MockHook:
             hook = MockHook()
             hook.get_conn.return_value = self.mock_conn
-            hook.await.return_value = None
+            hook.wait.return_value = None
 
             dataproc_task = DataprocWorkflowTemplateInstantiateOperator(
                 task_id=TASK_ID,
@@ -586,7 +586,7 @@ class 
DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase):
             self.mock_workflows.instantiate.assert_called_once_with(
                 name=template_name,
                 body=mock.ANY)
-            hook.await.assert_called_once_with(self.operation)
+            hook.wait.assert_called_once_with(self.operation)
 
 
 class DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase):
@@ -617,7 +617,7 @@ class 
DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase):
         with patch(HOOK) as MockHook:
             hook = MockHook()
             hook.get_conn.return_value = self.mock_conn
-            hook.await.return_value = None
+            hook.wait.return_value = None
 
             template = {
                 "placement": {
@@ -652,4 +652,4 @@ class 
DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase):
                 parent='projects/test-project-id/regions/test-region',
                 instanceId=mock.ANY,
                 body=template)
-            hook.await.assert_called_once_with(self.operation)
+            hook.wait.assert_called_once_with(self.operation)

Reply via email to