Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test 77715b9e7 -> 7e65998a1
Revert "[AIRFLOW-782] Add support for DataFlowPythonOperator." This reverts commit dc97bcd3b7e0a7eebd838f0fb0452a0b47ba417b. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7e65998a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7e65998a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7e65998a Branch: refs/heads/v1-8-test Commit: 7e65998a1bedd00e74fa333cfee78ad574aaa849 Parents: 77715b9 Author: Bolke de Bruin <bo...@xs4all.nl> Authored: Wed Feb 1 15:56:14 2017 +0000 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Wed Feb 1 15:56:14 2017 +0000 ---------------------------------------------------------------------- airflow/contrib/hooks/gcp_dataflow_hook.py | 33 +++----- airflow/contrib/operators/dataflow_operator.py | 85 ++------------------- tests/contrib/hooks/gcp_dataflow_hook.py | 56 -------------- tests/contrib/operators/dataflow_operator.py | 76 ------------------ 4 files changed, 18 insertions(+), 232 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/airflow/contrib/hooks/gcp_dataflow_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py index aaa9992..bd5bd3c 100644 --- a/airflow/contrib/hooks/gcp_dataflow_hook.py +++ b/airflow/contrib/hooks/gcp_dataflow_hook.py @@ -24,7 +24,6 @@ from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook class _DataflowJob(object): - def __init__(self, dataflow, project_number, name): self._dataflow = dataflow self._project_number = project_number @@ -83,8 +82,7 @@ class _DataflowJob(object): return self._job -class _Dataflow(object): - +class _DataflowJava(object): def __init__(self, cmd): self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -115,12 +113,11 @@ class _Dataflow(object): else: logging.info("Waiting for DataFlow process to complete.") if self._proc.returncode is not 0: - raise Exception("DataFlow failed with return code {}".format( + raise Exception("DataFlow jar failed with return code {}".format( self._proc.returncode)) class DataFlowHook(GoogleCloudBaseHook): - def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None): @@ -133,27 +130,21 @@ class DataFlowHook(GoogleCloudBaseHook): http_authorized = self._authorize() return build('dataflow', 'v1b3', http=http_authorized) - def _start_dataflow(self, task_id, variables, dataflow, name, command_prefix): - cmd = command_prefix + self._build_cmd(task_id, variables, dataflow) - _Dataflow(cmd).wait_for_done() - _DataflowJob( - self.get_conn(), variables['project'], name).wait_for_done() - def start_java_dataflow(self, task_id, variables, dataflow): name = task_id + "-" + str(uuid.uuid1())[:8] - variables['jobName'] = name - self._start_dataflow( - task_id, variables, dataflow, name, ["java", "-jar"]) + cmd = self._build_cmd(task_id, variables, dataflow, name) + _DataflowJava(cmd).wait_for_done() + _DataflowJob(self.get_conn(), variables['project'], name).wait_for_done() - def start_python_dataflow(self, task_id, variables, dataflow, py_options): - name = task_id + "-" + str(uuid.uuid1())[:8] - variables["job_name"] = name - self._start_dataflow( - task_id, variables, dataflow, name, ["python"] + py_options) + def _build_cmd(self, task_id, variables, dataflow, name): + command = ["java", "-jar", + dataflow, + "--runner=DataflowPipelineRunner", + "--streaming=false", + "--jobName=" + name] - def _build_cmd(self, task_id, variables, dataflow): - command = [dataflow, "--runner=DataflowPipelineRunner"] if variables is not None: for attr, value in variables.iteritems(): command.append("--" + attr + "=" + value) + return command http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/airflow/contrib/operators/dataflow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py index ef49eb6..10a6811 100644 --- a/airflow/contrib/operators/dataflow_operator.py +++ b/airflow/contrib/operators/dataflow_operator.py @@ -13,7 +13,6 @@ # limitations under the License. import copy -import re from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook from airflow.models import BaseOperator @@ -71,13 +70,9 @@ class DataFlowJavaOperator(BaseOperator): *args, **kwargs): """ - Create a new DataFlowJavaOperator. Note that both - dataflow_default_options and options will be merged to specify pipeline - execution parameter, and dataflow_default_options is expected to save - high-level options, for instances, project and zone information, which - apply to all dataflow operators in the DAG. + Create a new DataFlowJavaOperator. - For more detail on job submission have a look at the reference: + For more detail on about job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params @@ -87,12 +82,11 @@ class DataFlowJavaOperator(BaseOperator): :type dataflow_default_options: dict :param options: Map of job specific options. :type options: dict - :param gcp_conn_id: The connection ID to use connecting to Google Cloud - Platform. + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. :type gcp_conn_id: string :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have - domain-wide delegation enabled. + For this to work, the service account making the request must have domain-wide + delegation enabled. :type delegate_to: string """ super(DataFlowJavaOperator, self).__init__(*args, **kwargs) @@ -107,76 +101,9 @@ class DataFlowJavaOperator(BaseOperator): self.options = options def execute(self, context): - hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, - delegate_to=self.delegate_to) + hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) dataflow_options = copy.copy(self.dataflow_default_options) dataflow_options.update(self.options) hook.start_java_dataflow(self.task_id, dataflow_options, self.jar) - - -class DataFlowPythonOperator(BaseOperator): - - @apply_defaults - def __init__( - self, - py_file, - py_options=None, - dataflow_default_options=None, - options=None, - gcp_conn_id='google_cloud_default', - delegate_to=None, - *args, - **kwargs): - """ - Create a new DataFlowPythonOperator. Note that both - dataflow_default_options and options will be merged to specify pipeline - execution parameter, and dataflow_default_options is expected to save - high-level options, for instances, project and zone information, which - apply to all dataflow operators in the DAG. - - For more detail on job submission have a look at the reference: - - https://cloud.google.com/dataflow/pipelines/specifying-exec-params - - :param py_file: Reference to the python dataflow pipleline file, e.g., - /some/local/file/path/to/your/python/pipeline/file.py. - :type py_file: string - :param py_options: Additional python options. - :type pyt_options: list of strings, e.g., ["-m", "-v"]. - :param dataflow_default_options: Map of default job options. - :type dataflow_default_options: dict - :param options: Map of job specific options. - :type options: dict - :param gcp_conn_id: The connection ID to use connecting to Google Cloud - Platform. - :type gcp_conn_id: string - :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have - domain-wide delegation enabled. - :type delegate_to: string - """ - super(DataFlowPythonOperator, self).__init__(*args, **kwargs) - - self.py_file = py_file - self.py_options = py_options or [] - self.dataflow_default_options = dataflow_default_options or {} - self.options = options or {} - self.gcp_conn_id = gcp_conn_id - self.delegate_to = delegate_to - - def execute(self, context): - """Execute the python dataflow job.""" - hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, - delegate_to=self.delegate_to) - dataflow_options = self.dataflow_default_options.copy() - dataflow_options.update(self.options) - # Convert argument names from lowerCamelCase to snake case. - camel_to_snake = lambda name: re.sub( - r'[A-Z]', lambda x: '_' + x.group(0).lower(), name) - formatted_options = {camel_to_snake(key): dataflow_options[key] - for key in dataflow_options} - hook.start_python_dataflow( - self.task_id, formatted_options, - self.py_file, self.py_options) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/tests/contrib/hooks/gcp_dataflow_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/gcp_dataflow_hook.py b/tests/contrib/hooks/gcp_dataflow_hook.py deleted file mode 100644 index 797d40c..0000000 --- a/tests/contrib/hooks/gcp_dataflow_hook.py +++ /dev/null @@ -1,56 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import unittest -from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook - -try: - from unittest import mock -except ImportError: - try: - import mock - except ImportError: - mock = None - - -TASK_ID = 'test-python-dataflow' -PY_FILE = 'apache_beam.examples.wordcount' -PY_OPTIONS = ['-m'] -OPTIONS = { - 'project': 'test', - 'staging_location': 'gs://test/staging' -} -BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}' -DATAFLOW_STRING = 'airflow.contrib.hooks.gcp_dataflow_hook.{}' - - -def mock_init(self, gcp_conn_id, delegate_to=None): - pass - - -class DataFlowHookTest(unittest.TestCase): - - def setUp(self): - with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'), - new=mock_init): - self.dataflow_hook = DataFlowHook(gcp_conn_id='test') - - @mock.patch(DATAFLOW_STRING.format('DataFlowHook._start_dataflow')) - def test_start_python_dataflow(self, internal_dataflow_mock): - self.dataflow_hook.start_python_dataflow( - task_id=TASK_ID, variables=OPTIONS, - dataflow=PY_FILE, py_options=PY_OPTIONS) - internal_dataflow_mock.assert_called_once_with( - TASK_ID, OPTIONS, PY_FILE, mock.ANY, ['python'] + PY_OPTIONS) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/tests/contrib/operators/dataflow_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/dataflow_operator.py b/tests/contrib/operators/dataflow_operator.py deleted file mode 100644 index 4f887c1..0000000 --- a/tests/contrib/operators/dataflow_operator.py +++ /dev/null @@ -1,76 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import unittest - -from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator - -try: - from unittest import mock -except ImportError: - try: - import mock - except ImportError: - mock = None - - -TASK_ID = 'test-python-dataflow' -PY_FILE = 'apache_beam.examples.wordcount' -PY_OPTIONS = ['-m'] -DEFAULT_OPTIONS = { - 'project': 'test', - 'stagingLocation': 'gs://test/staging' -} -ADDITIONAL_OPTIONS = { - 'output': 'gs://test/output' -} - - -class DataFlowPythonOperatorTest(unittest.TestCase): - - def setUp(self): - self.dataflow = DataFlowPythonOperator( - task_id=TASK_ID, - py_file=PY_FILE, - py_options=PY_OPTIONS, - dataflow_default_options=DEFAULT_OPTIONS, - options=ADDITIONAL_OPTIONS) - - def test_init(self): - """Test DataFlowPythonOperator instance is properly initialized.""" - self.assertEqual(self.dataflow.task_id, TASK_ID) - self.assertEqual(self.dataflow.py_file, PY_FILE) - self.assertEqual(self.dataflow.py_options, PY_OPTIONS) - self.assertEqual(self.dataflow.dataflow_default_options, - DEFAULT_OPTIONS) - self.assertEqual(self.dataflow.options, - ADDITIONAL_OPTIONS) - - @mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook') - def test_exec(self, dataflow_mock): - """Test DataFlowHook is created and the right args are passed to - start_python_workflow. - - """ - start_python_hook = dataflow_mock.return_value.start_python_dataflow - self.dataflow.execute(None) - assert dataflow_mock.called - expected_options = { - 'project': 'test', - 'staging_location': 'gs://test/staging', - 'output': 'gs://test/output' - } - start_python_hook.assert_called_once_with(TASK_ID, expected_options, - PY_FILE, PY_OPTIONS)