Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6270dcf93 -> 06e70e2d1


[AIRFLOW-173] Initial implementation of FileSensor

Implement a simple FileSensor to detect the creation of files and folder
in a given filesystem (a la HDFSSensor)

Closes #1543 from trixpan/AIRFLOW-173


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/06e70e2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/06e70e2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/06e70e2d

Branch: refs/heads/master
Commit: 06e70e2d13c47e1e4bbc4a9146bc93a02b45f98c
Parents: 6270dcf
Author: Andre F de Miranda <[email protected]>
Authored: Wed Jun 15 22:02:44 2016 +0200
Committer: Bolke de Bruin <[email protected]>
Committed: Wed Jun 15 22:02:44 2016 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/__init__.py        |  3 +-
 airflow/contrib/hooks/fs_hook.py         | 41 +++++++++++++++++
 airflow/contrib/operators/__init__.py    |  3 +-
 airflow/contrib/operators/fs_operator.py | 57 ++++++++++++++++++++++++
 airflow/utils/db.py                      |  4 ++
 airflow/www/views.py                     |  1 +
 tests/contrib/operators/__init__.py      | 16 +++++++
 tests/contrib/operators/fs_operator.py   | 64 +++++++++++++++++++++++++++
 8 files changed, 187 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py 
b/airflow/contrib/hooks/__init__.py
index 17a9f29..29bb44a 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -27,7 +27,8 @@ _hooks = {
     'gcs_hook': ['GoogleCloudStorageHook'],
     'datastore_hook': ['DatastoreHook'],
     'gcp_dataproc_hook': ['DataProcHook'],
-    'cloudant_hook': ['CloudantHook']
+    'cloudant_hook': ['CloudantHook'],
+    'fs_hook': ['FSHook']
 }
 
 _import_module_attrs(globals(), _hooks)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/contrib/hooks/fs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/fs_hook.py b/airflow/contrib/hooks/fs_hook.py
new file mode 100644
index 0000000..bee60ce
--- /dev/null
+++ b/airflow/contrib/hooks/fs_hook.py
@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class FSHook(BaseHook):
+    '''
+    Allows for interaction with an file server.
+
+    Connection should have a name and a path specified under extra:
+
+    example:
+    Conn Id: fs_test
+    Conn Type: File (path)
+    Host, Shchema, Login, Password, Port: empty
+    Extra: {"path": "/tmp"}
+    '''
+
+    def __init__(self, conn_id='fs_default'):
+        conn = self.get_connection(conn_id)
+        self.basepath = conn.extra_dejson.get('path', '')
+        self.conn = conn
+
+    def get_conn(self):
+        pass
+
+    def get_path(self):
+        return self.basepath

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/contrib/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/__init__.py 
b/airflow/contrib/operators/__init__.py
index 3598490..9f758bf 100644
--- a/airflow/contrib/operators/__init__.py
+++ b/airflow/contrib/operators/__init__.py
@@ -6,7 +6,8 @@ _operators = {
     'ssh_execute_operator': ['SSHExecuteOperator'],
     'vertica_operator': ['VerticaOperator'],
     'vertica_to_hive': ['VerticaToHiveTransfer'],
-    'qubole_operator': ['QuboleOperator']
+    'qubole_operator': ['QuboleOperator'],
+    'fs': ['FileSensor']
 }
 
 _import_module_attrs(globals(), _operators)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/fs_operator.py 
b/airflow/contrib/operators/fs_operator.py
new file mode 100644
index 0000000..c68eed2
--- /dev/null
+++ b/airflow/contrib/operators/fs_operator.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from os import walk
+import logging
+
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.contrib.hooks import FSHook
+from airflow.utils.decorators import apply_defaults
+
+class FileSensor(BaseSensorOperator):
+    """
+    Waits for a file or folder to land in a filesystem
+
+    :param fs_conn_id: reference to the File (path)
+        connection id
+    :type fs_conn_id: string
+    :param filepath: File or folder name (relative to
+        the base path set within the connection)
+    :type fs_conn_id: string
+    """
+    template_fields = ('filepath',)
+
+    @apply_defaults
+    def __init__(
+            self,
+            filepath,
+            fs_conn_id='fs_default2',
+            *args, **kwargs):
+        super(FileSensor, self).__init__(*args, **kwargs)
+        self.filepath = filepath
+        self.fs_conn_id = fs_conn_id
+
+    def poke(self, context):
+        hook = FSHook(self.fs_conn_id)
+        basepath = hook.get_path()
+        full_path = "/".join([basepath, self.filepath])
+        logging.info(
+            'Poking for file {full_path} '.format(**locals()))
+        try:
+            files = [f for f in walk(full_path)]
+        except:
+            return False
+        return True
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 7045f73..e165e2e 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -182,6 +182,10 @@ def initdb():
         models.Connection(
             conn_id='ssh_default', conn_type='ssh',
             host='localhost'))
+    merge_conn(
+        models.Connection(
+            conn_id='fs_default', conn_type='fs',
+            extra='{"path": "/"}'))
 
     # Known event types
     KET = models.KnownEventType

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b468bc1..e91479f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2263,6 +2263,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, 
AirflowModelView):
     }
     form_choices = {
         'conn_type': [
+            ('fs', 'File (path)'),
             ('ftp', 'FTP',),
             ('google_cloud_platform', 'Google Cloud Platform'),
             ('hdfs', 'HDFS',),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/tests/contrib/operators/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/__init__.py 
b/tests/contrib/operators/__init__.py
index 9c0779f..6e38bea 100644
--- a/tests/contrib/operators/__init__.py
+++ b/tests/contrib/operators/__init__.py
@@ -1,2 +1,18 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 from __future__ import absolute_import
 from .ssh_execute_operator import *
+from .fs_operator import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06e70e2d/tests/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/fs_operator.py 
b/tests/contrib/operators/fs_operator.py
new file mode 100644
index 0000000..d7f92aa
--- /dev/null
+++ b/tests/contrib/operators/fs_operator.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+from datetime import datetime
+
+from airflow import configuration
+from airflow.settings import Session
+from airflow import models, DAG
+from airflow.contrib.operators.fs_operator import FileSensor
+
+TEST_DAG_ID = 'unit_tests'
+DEFAULT_DATE = datetime(2015, 1, 1)
+configuration.test_mode()
+
+
+def reset(dag_id=TEST_DAG_ID):
+    session = Session()
+    tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+    tis.delete()
+    session.commit()
+    session.close()
+
+reset()
+
+class FileSensorTest(unittest.TestCase):
+    def setUp(self):
+        configuration.test_mode()
+        from airflow.contrib.hooks.fs_hook import FSHook
+        hook = FSHook()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE,
+            'provide_context': True
+        }
+        dag = DAG(TEST_DAG_ID+'test_schedule_dag_once', default_args=args)
+        dag.schedule_interval = '@once'
+        self.hook = hook
+        self.dag = dag
+
+    def test_simple(self):
+        task = FileSensor(
+            task_id="test",
+            filepath="etc/hosts",
+            fs_conn_id='fs_default',
+            _hook=self.hook,
+            dag=self.dag,
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to