Repository: incubator-airflow Updated Branches: refs/heads/master 6270dcf93 -> 06e70e2d1
[AIRFLOW-173] Initial implementation of FileSensor Implement a simple FileSensor to detect the creation of files and folder in a given filesystem (a la HDFSSensor) Closes #1543 from trixpan/AIRFLOW-173 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/06e70e2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/06e70e2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/06e70e2d Branch: refs/heads/master Commit: 06e70e2d13c47e1e4bbc4a9146bc93a02b45f98c Parents: 6270dcf Author: Andre F de Miranda <[email protected]> Authored: Wed Jun 15 22:02:44 2016 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Jun 15 22:02:44 2016 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/__init__.py | 3 +- airflow/contrib/hooks/fs_hook.py | 41 +++++++++++++++++ airflow/contrib/operators/__init__.py | 3 +- airflow/contrib/operators/fs_operator.py | 57 ++++++++++++++++++++++++ airflow/utils/db.py | 4 ++ airflow/www/views.py | 1 + tests/contrib/operators/__init__.py | 16 +++++++ tests/contrib/operators/fs_operator.py | 64 +++++++++++++++++++++++++++ 8 files changed, 187 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/contrib/hooks/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py index 17a9f29..29bb44a 100644 --- a/airflow/contrib/hooks/__init__.py +++ b/airflow/contrib/hooks/__init__.py @@ -27,7 +27,8 @@ _hooks = { 'gcs_hook': ['GoogleCloudStorageHook'], 'datastore_hook': ['DatastoreHook'], 'gcp_dataproc_hook': ['DataProcHook'], - 'cloudant_hook': ['CloudantHook'] + 'cloudant_hook': ['CloudantHook'], + 'fs_hook': ['FSHook'] } _import_module_attrs(globals(), _hooks) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/contrib/hooks/fs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/fs_hook.py b/airflow/contrib/hooks/fs_hook.py new file mode 100644 index 0000000..bee60ce --- /dev/null +++ b/airflow/contrib/hooks/fs_hook.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from airflow.hooks.base_hook import BaseHook + + +class FSHook(BaseHook): + ''' + Allows for interaction with an file server. + + Connection should have a name and a path specified under extra: + + example: + Conn Id: fs_test + Conn Type: File (path) + Host, Shchema, Login, Password, Port: empty + Extra: {"path": "/tmp"} + ''' + + def __init__(self, conn_id='fs_default'): + conn = self.get_connection(conn_id) + self.basepath = conn.extra_dejson.get('path', '') + self.conn = conn + + def get_conn(self): + pass + + def get_path(self): + return self.basepath http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/contrib/operators/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py index 3598490..9f758bf 100644 --- a/airflow/contrib/operators/__init__.py +++ b/airflow/contrib/operators/__init__.py @@ -6,7 +6,8 @@ _operators = { 'ssh_execute_operator': ['SSHExecuteOperator'], 'vertica_operator': ['VerticaOperator'], 'vertica_to_hive': ['VerticaToHiveTransfer'], - 'qubole_operator': ['QuboleOperator'] + 'qubole_operator': ['QuboleOperator'], + 'fs': ['FileSensor'] } _import_module_attrs(globals(), _operators) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/contrib/operators/fs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py new file mode 100644 index 0000000..c68eed2 --- /dev/null +++ b/airflow/contrib/operators/fs_operator.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from os import walk +import logging + +from airflow.operators.sensors import BaseSensorOperator +from airflow.contrib.hooks import FSHook +from airflow.utils.decorators import apply_defaults + +class FileSensor(BaseSensorOperator): + """ + Waits for a file or folder to land in a filesystem + + :param fs_conn_id: reference to the File (path) + connection id + :type fs_conn_id: string + :param filepath: File or folder name (relative to + the base path set within the connection) + :type fs_conn_id: string + """ + template_fields = ('filepath',) + + @apply_defaults + def __init__( + self, + filepath, + fs_conn_id='fs_default2', + *args, **kwargs): + super(FileSensor, self).__init__(*args, **kwargs) + self.filepath = filepath + self.fs_conn_id = fs_conn_id + + def poke(self, context): + hook = FSHook(self.fs_conn_id) + basepath = hook.get_path() + full_path = "/".join([basepath, self.filepath]) + logging.info( + 'Poking for file {full_path} '.format(**locals())) + try: + files = [f for f in walk(full_path)] + except: + return False + return True + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/utils/db.py ---------------------------------------------------------------------- diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 7045f73..e165e2e 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -182,6 +182,10 @@ def initdb(): models.Connection( conn_id='ssh_default', conn_type='ssh', host='localhost')) + merge_conn( + models.Connection( + conn_id='fs_default', conn_type='fs', + extra='{"path": "/"}')) # Known event types KET = models.KnownEventType http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index b468bc1..e91479f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2263,6 +2263,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): } form_choices = { 'conn_type': [ + ('fs', 'File (path)'), ('ftp', 'FTP',), ('google_cloud_platform', 'Google Cloud Platform'), ('hdfs', 'HDFS',), http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/tests/contrib/operators/__init__.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/__init__.py b/tests/contrib/operators/__init__.py index 9c0779f..6e38bea 100644 --- a/tests/contrib/operators/__init__.py +++ b/tests/contrib/operators/__init__.py @@ -1,2 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from __future__ import absolute_import from .ssh_execute_operator import * +from .fs_operator import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/tests/contrib/operators/fs_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/fs_operator.py b/tests/contrib/operators/fs_operator.py new file mode 100644 index 0000000..d7f92aa --- /dev/null +++ b/tests/contrib/operators/fs_operator.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest +from datetime import datetime + +from airflow import configuration +from airflow.settings import Session +from airflow import models, DAG +from airflow.contrib.operators.fs_operator import FileSensor + +TEST_DAG_ID = 'unit_tests' +DEFAULT_DATE = datetime(2015, 1, 1) +configuration.test_mode() + + +def reset(dag_id=TEST_DAG_ID): + session = Session() + tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id) + tis.delete() + session.commit() + session.close() + +reset() + +class FileSensorTest(unittest.TestCase): + def setUp(self): + configuration.test_mode() + from airflow.contrib.hooks.fs_hook import FSHook + hook = FSHook() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE, + 'provide_context': True + } + dag = DAG(TEST_DAG_ID+'test_schedule_dag_once', default_args=args) + dag.schedule_interval = '@once' + self.hook = hook + self.dag = dag + + def test_simple(self): + task = FileSensor( + task_id="test", + filepath="etc/hosts", + fs_conn_id='fs_default', + _hook=self.hook, + dag=self.dag, + ) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + +if __name__ == '__main__': + unittest.main()
