Repository: incubator-airflow Updated Branches: refs/heads/master 9b7525f25 -> fcd51f362
[AIRFLOW-2716] Replace async and await py3.7 keywords Closes #3578 from JacobHayes/py37-keywords Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fcd51f36 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fcd51f36 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fcd51f36 Branch: refs/heads/master Commit: fcd51f362c6804cb69dee8e2dd054304a0564dde Parents: 9b7525f Author: Jacob Hayes <jacob.r.ha...@gmail.com> Authored: Sun Jul 29 11:56:41 2018 +0200 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Sun Jul 29 11:56:41 2018 +0200 ---------------------------------------------------------------------- UPDATING.md | 4 ++++ airflow/contrib/hooks/gcp_dataproc_hook.py | 12 +++++++++++- airflow/contrib/operators/dataproc_operator.py | 2 +- airflow/executors/celery_executor.py | 12 ++++++------ tests/contrib/operators/test_dataproc_operator.py | 8 ++++---- 5 files changed, 26 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fcd51f36/UPDATING.md ---------------------------------------------------------------------- diff --git a/UPDATING.md b/UPDATING.md index 2ca421d..da80f56 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -5,6 +5,10 @@ assists users migrating to a new version. ## Airflow Master +### Replace DataProcHook.await calls to DataProcHook.wait + +The method name was changed to be compatible with the Python 3.7 async/await keywords + ### DAG level Access Control for new RBAC UI Extend and enhance new Airflow RBAC UI to support DAG level ACL. Each dag now has two permissions(one for write, one for read) associated('can_dag_edit', 'can_dag_read'). http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fcd51f36/airflow/contrib/hooks/gcp_dataproc_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py index fc15137..8e4f32b 100644 --- a/airflow/contrib/hooks/gcp_dataproc_hook.py +++ b/airflow/contrib/hooks/gcp_dataproc_hook.py @@ -21,6 +21,7 @@ import time import uuid from apiclient.discovery import build +from zope.deprecation import deprecation from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook from airflow.utils.log.logging_mixin import LoggingMixin @@ -224,7 +225,16 @@ class DataProcHook(GoogleCloudBaseHook): return _DataProcJobBuilder(self.project_id, task_id, cluster_name, job_type, properties) - def await(self, operation): + def wait(self, operation): """Awaits for Google Cloud Dataproc Operation to complete.""" submitted = _DataProcOperation(self.get_conn(), operation) submitted.wait_for_done() + + +setattr( + DataProcHook, + "await", + deprecation.deprecated( + DataProcHook.wait, "renamed to 'wait' for Python3.7 compatability" + ), +) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fcd51f36/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 23cfeb0..01d137f 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -1265,7 +1265,7 @@ class DataprocWorkflowTemplateBaseOperator(BaseOperator): ) def execute(self, context): - self.hook.await(self.start()) + self.hook.wait(self.start()) def start(self, context): raise AirflowException('plese start a workflow operation') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fcd51f36/airflow/executors/celery_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 6cfd2d3..481daa5 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -89,9 +89,9 @@ class CeleryExecutor(BaseExecutor): def sync(self): self.log.debug("Inquiring about %s celery task(s)", len(self.tasks)) - for key, async in list(self.tasks.items()): + for key, task in list(self.tasks.items()): try: - state = async.state + state = task.state if self.last_state[key] != state: if state == celery_states.SUCCESS: self.success(key) @@ -106,8 +106,8 @@ class CeleryExecutor(BaseExecutor): del self.tasks[key] del self.last_state[key] else: - self.log.info("Unexpected state: %s", async.state) - self.last_state[key] = async.state + self.log.info("Unexpected state: %s", task.state) + self.last_state[key] = task.state except Exception as e: self.log.error("Error syncing the celery executor, ignoring it:") self.log.exception(e) @@ -115,7 +115,7 @@ class CeleryExecutor(BaseExecutor): def end(self, synchronous=False): if synchronous: while any([ - async.state not in celery_states.READY_STATES - for async in self.tasks.values()]): + task.state not in celery_states.READY_STATES + for task in self.tasks.values()]): time.sleep(5) self.sync() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fcd51f36/tests/contrib/operators/test_dataproc_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index f584ba7..038cf61 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -569,7 +569,7 @@ class DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase): with patch(HOOK) as MockHook: hook = MockHook() hook.get_conn.return_value = self.mock_conn - hook.await.return_value = None + hook.wait.return_value = None dataproc_task = DataprocWorkflowTemplateInstantiateOperator( task_id=TASK_ID, @@ -586,7 +586,7 @@ class DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase): self.mock_workflows.instantiate.assert_called_once_with( name=template_name, body=mock.ANY) - hook.await.assert_called_once_with(self.operation) + hook.wait.assert_called_once_with(self.operation) class DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase): @@ -617,7 +617,7 @@ class DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase): with patch(HOOK) as MockHook: hook = MockHook() hook.get_conn.return_value = self.mock_conn - hook.await.return_value = None + hook.wait.return_value = None template = { "placement": { @@ -652,4 +652,4 @@ class DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase): parent='projects/test-project-id/regions/test-region', instanceId=mock.ANY, body=template) - hook.await.assert_called_once_with(self.operation) + hook.wait.assert_called_once_with(self.operation)