Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-10-test 88ccad5da -> 879e00a84


[AIRFLOW-2389] Create a pinot db api hook

Closes #3274 from feng-tao/pinot_db_hook

(cherry picked from commit 700c0f488f5f08772983584b2b635973618548b5)
Signed-off-by: Fokko Driesprong <fokkodriespr...@godatadriven.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/879e00a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/879e00a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/879e00a8

Branch: refs/heads/v1-10-test
Commit: 879e00a840c99e127f7f60916beac7727a8a0897
Parents: 88ccad5
Author: Tao feng <tf...@lyft.com>
Authored: Mon Apr 30 08:41:43 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Mon Apr 30 08:42:05 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/pinot_hook.py    | 105 ++++++++++++++++++++++++++++
 docs/code.rst                          |   1 +
 setup.py                               |   6 +-
 tests/contrib/hooks/test_pinot_hook.py |  76 ++++++++++++++++++++
 4 files changed, 186 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/879e00a8/airflow/contrib/hooks/pinot_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/pinot_hook.py 
b/airflow/contrib/hooks/pinot_hook.py
new file mode 100644
index 0000000..d731211
--- /dev/null
+++ b/airflow/contrib/hooks/pinot_hook.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import six
+
+from pinotdb import connect
+
+from airflow.hooks.dbapi_hook import DbApiHook
+
+
+class PinotDbApiHook(DbApiHook):
+    """
+    Connect to pinot db(https://github.com/linkedin/pinot) to issue pql
+    """
+    conn_name_attr = 'pinot_broker_conn_id'
+    default_conn_name = 'pinot_broker_default'
+    supports_autocommit = False
+
+    def __init__(self, *args, **kwargs):
+        super(PinotDbApiHook, self).__init__(*args, **kwargs)
+
+    def get_conn(self):
+        """
+        Establish a connection to pinot broker through pinot dbqpi.
+        """
+        conn = self.get_connection(self.pinot_broker_conn_id)
+        pinot_broker_conn = connect(
+            host=conn.host,
+            port=conn.port,
+            path=conn.extra_dejson.get('endpoint', '/pql'),
+            scheme=conn.extra_dejson.get('schema', 'http')
+        )
+        self.log.info('Get the connection to pinot '
+                      'broker on {host}'.format(host=conn.host))
+        return pinot_broker_conn
+
+    def get_uri(self):
+        """
+        Get the connection uri for pinot broker.
+
+        e.g: http://localhost:9000/pql
+        """
+        conn = self.get_connection(getattr(self, self.conn_name_attr))
+        host = conn.host
+        if conn.port is not None:
+            host += ':{port}'.format(port=conn.port)
+        conn_type = 'http' if not conn.conn_type else conn.conn_type
+        endpoint = conn.extra_dejson.get('endpoint', 'pql')
+        return '{conn_type}://{host}/{endpoint}'.format(
+            conn_type=conn_type, host=host, endpoint=endpoint)
+
+    def get_records(self, sql):
+        """
+        Executes the sql and returns a set of records.
+
+        :param sql: the sql statement to be executed (str) or a list of
+            sql statements to execute
+        :type sql: str
+        """
+        if six.PY2:
+            sql = sql.encode('utf-8')
+
+        with self.get_conn() as cur:
+            cur.execute(sql)
+            return cur.fetchall()
+
+    def get_first(self, sql):
+        """
+        Executes the sql and returns the first resulting row.
+
+        :param sql: the sql statement to be executed (str) or a list of
+            sql statements to execute
+        :type sql: str or list
+        """
+        if six.PY2:
+            sql = sql.encode('utf-8')
+
+        with self.get_conn() as cur:
+            cur.execute(sql)
+            return cur.fetchone()
+
+    def set_autocommit(self, conn, autocommit):
+        raise NotImplementedError()
+
+    def get_pandas_df(self, sql, parameters=None):
+        raise NotImplementedError()
+
+    def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
+        raise NotImplementedError()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/879e00a8/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 16867f6..c979f26 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -365,6 +365,7 @@ Community contributed hooks
 .. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook
 .. autoclass:: airflow.contrib.hooks.jenkins_hook.JenkinsHook
 .. autoclass:: airflow.contrib.hooks.jira_hook.JiraHook
+.. autoclass:: airflow.contrib.hooks.pinot_hook.PinotDbApiHook
 .. autoclass:: airflow.contrib.hooks.qubole_hook.QuboleHook
 .. autoclass:: airflow.contrib.hooks.redis_hook.RedisHook
 .. autoclass:: airflow.contrib.hooks.redshift_hook.RedshiftHook

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/879e00a8/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index fef8d8a..1842a14 100644
--- a/setup.py
+++ b/setup.py
@@ -151,6 +151,7 @@ mysql = ['mysqlclient>=1.3.6']
 rabbitmq = ['librabbitmq>=1.6.1']
 oracle = ['cx_Oracle>=5.1.2']
 postgres = ['psycopg2-binary>=2.7.4']
+pinot = ['pinotdb>=0.1.1']
 ssh = ['paramiko>=2.1.1', 'pysftp>=0.2.9']
 salesforce = ['simple-salesforce>=0.72']
 s3 = ['boto3>=1.7.0']
@@ -177,7 +178,7 @@ snowflake = ['snowflake-connector-python>=1.5.2',
              'snowflake-sqlalchemy>=1.1.0']
 zendesk = ['zdesk']
 
-all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid
+all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid 
+ pinot
 devel = [
     'click',
     'freezegun',
@@ -200,7 +201,7 @@ devel_hadoop = devel_minreq + hive + hdfs + webhdfs + 
kerberos
 devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + 
oracle +
              docker + ssh + kubernetes + celery + azure + redis + gcp_api + 
datadog +
              zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
-             druid + snowflake + elasticsearch)
+             druid + pinot + snowflake + elasticsearch)
 
 # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
 if PY3:
@@ -293,6 +294,7 @@ def do_setup():
             'mysql': mysql,
             'oracle': oracle,
             'password': password,
+            'pinot': pinot,
             'postgres': postgres,
             'qds': qds,
             'rabbitmq': rabbitmq,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/879e00a8/tests/contrib/hooks/test_pinot_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_pinot_hook.py 
b/tests/contrib/hooks/test_pinot_hook.py
new file mode 100644
index 0000000..d63ee1c
--- /dev/null
+++ b/tests/contrib/hooks/test_pinot_hook.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+import unittest
+
+from airflow.contrib.hooks.pinot_hook import PinotDbApiHook
+
+
+class TestPinotDbApiHook(unittest.TestCase):
+
+    def setUp(self):
+        super(TestPinotDbApiHook, self).setUp()
+        self.conn = conn = mock.MagicMock()
+        self.conn.host = 'host'
+        self.conn.port = '1000'
+        self.conn.conn_type = 'http'
+        self.conn.extra_dejson = {'endpoint': 'pql'}
+        self.cur = mock.MagicMock()
+        self.conn.__enter__.return_value = self.cur
+        self.conn.__exit__.return_value = None
+
+        class TestPinotDBApiHook(PinotDbApiHook):
+            def get_conn(self):
+                return conn
+
+            def get_connection(self, conn_id):
+                return conn
+
+        self.db_hook = TestPinotDBApiHook
+
+    def test_get_uri(self):
+        """
+        Test on getting a pinot connection uri
+        """
+        db_hook = self.db_hook()
+        self.assertEquals(db_hook.get_uri(), 'http://host:1000/pql')
+
+    def test_get_conn(self):
+        """
+        Test on getting a pinot connection
+        """
+        conn = self.db_hook().get_conn()
+        self.assertEqual(conn.host, 'host')
+        self.assertEqual(conn.port, '1000')
+        self.assertEqual(conn.conn_type, 'http')
+        self.assertEqual(conn.extra_dejson.get('endpoint'), 'pql')
+
+    def test_get_records(self):
+        statement = 'SQL'
+        result_sets = [('row1',), ('row2',)]
+        self.cur.fetchall.return_value = result_sets
+        self.assertEqual(result_sets, self.db_hook().get_records(statement))
+
+    def test_get_first(self):
+        statement = 'SQL'
+        result_sets = [('row1',), ('row2',)]
+        self.cur.fetchone.return_value = result_sets[0]
+        self.assertEqual(result_sets[0], self.db_hook().get_first(statement))

Reply via email to