Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test 88ccad5da -> 879e00a84
[AIRFLOW-2389] Create a pinot db api hook Closes #3274 from feng-tao/pinot_db_hook (cherry picked from commit 700c0f488f5f08772983584b2b635973618548b5) Signed-off-by: Fokko Driesprong <fokkodriespr...@godatadriven.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/879e00a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/879e00a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/879e00a8 Branch: refs/heads/v1-10-test Commit: 879e00a840c99e127f7f60916beac7727a8a0897 Parents: 88ccad5 Author: Tao feng <tf...@lyft.com> Authored: Mon Apr 30 08:41:43 2018 +0200 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Mon Apr 30 08:42:05 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/pinot_hook.py | 105 ++++++++++++++++++++++++++++ docs/code.rst | 1 + setup.py | 6 +- tests/contrib/hooks/test_pinot_hook.py | 76 ++++++++++++++++++++ 4 files changed, 186 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/879e00a8/airflow/contrib/hooks/pinot_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/pinot_hook.py b/airflow/contrib/hooks/pinot_hook.py new file mode 100644 index 0000000..d731211 --- /dev/null +++ b/airflow/contrib/hooks/pinot_hook.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import six + +from pinotdb import connect + +from airflow.hooks.dbapi_hook import DbApiHook + + +class PinotDbApiHook(DbApiHook): + """ + Connect to pinot db(https://github.com/linkedin/pinot) to issue pql + """ + conn_name_attr = 'pinot_broker_conn_id' + default_conn_name = 'pinot_broker_default' + supports_autocommit = False + + def __init__(self, *args, **kwargs): + super(PinotDbApiHook, self).__init__(*args, **kwargs) + + def get_conn(self): + """ + Establish a connection to pinot broker through pinot dbqpi. + """ + conn = self.get_connection(self.pinot_broker_conn_id) + pinot_broker_conn = connect( + host=conn.host, + port=conn.port, + path=conn.extra_dejson.get('endpoint', '/pql'), + scheme=conn.extra_dejson.get('schema', 'http') + ) + self.log.info('Get the connection to pinot ' + 'broker on {host}'.format(host=conn.host)) + return pinot_broker_conn + + def get_uri(self): + """ + Get the connection uri for pinot broker. + + e.g: http://localhost:9000/pql + """ + conn = self.get_connection(getattr(self, self.conn_name_attr)) + host = conn.host + if conn.port is not None: + host += ':{port}'.format(port=conn.port) + conn_type = 'http' if not conn.conn_type else conn.conn_type + endpoint = conn.extra_dejson.get('endpoint', 'pql') + return '{conn_type}://{host}/{endpoint}'.format( + conn_type=conn_type, host=host, endpoint=endpoint) + + def get_records(self, sql): + """ + Executes the sql and returns a set of records. + + :param sql: the sql statement to be executed (str) or a list of + sql statements to execute + :type sql: str + """ + if six.PY2: + sql = sql.encode('utf-8') + + with self.get_conn() as cur: + cur.execute(sql) + return cur.fetchall() + + def get_first(self, sql): + """ + Executes the sql and returns the first resulting row. + + :param sql: the sql statement to be executed (str) or a list of + sql statements to execute + :type sql: str or list + """ + if six.PY2: + sql = sql.encode('utf-8') + + with self.get_conn() as cur: + cur.execute(sql) + return cur.fetchone() + + def set_autocommit(self, conn, autocommit): + raise NotImplementedError() + + def get_pandas_df(self, sql, parameters=None): + raise NotImplementedError() + + def insert_rows(self, table, rows, target_fields=None, commit_every=1000): + raise NotImplementedError() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/879e00a8/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index 16867f6..c979f26 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -365,6 +365,7 @@ Community contributed hooks .. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook .. autoclass:: airflow.contrib.hooks.jenkins_hook.JenkinsHook .. autoclass:: airflow.contrib.hooks.jira_hook.JiraHook +.. autoclass:: airflow.contrib.hooks.pinot_hook.PinotDbApiHook .. autoclass:: airflow.contrib.hooks.qubole_hook.QuboleHook .. autoclass:: airflow.contrib.hooks.redis_hook.RedisHook .. autoclass:: airflow.contrib.hooks.redshift_hook.RedshiftHook http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/879e00a8/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index fef8d8a..1842a14 100644 --- a/setup.py +++ b/setup.py @@ -151,6 +151,7 @@ mysql = ['mysqlclient>=1.3.6'] rabbitmq = ['librabbitmq>=1.6.1'] oracle = ['cx_Oracle>=5.1.2'] postgres = ['psycopg2-binary>=2.7.4'] +pinot = ['pinotdb>=0.1.1'] ssh = ['paramiko>=2.1.1', 'pysftp>=0.2.9'] salesforce = ['simple-salesforce>=0.72'] s3 = ['boto3>=1.7.0'] @@ -177,7 +178,7 @@ snowflake = ['snowflake-connector-python>=1.5.2', 'snowflake-sqlalchemy>=1.1.0'] zendesk = ['zdesk'] -all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid +all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid + pinot devel = [ 'click', 'freezegun', @@ -200,7 +201,7 @@ devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker + ssh + kubernetes + celery + azure + redis + gcp_api + datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins + - druid + snowflake + elasticsearch) + druid + pinot + snowflake + elasticsearch) # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'( if PY3: @@ -293,6 +294,7 @@ def do_setup(): 'mysql': mysql, 'oracle': oracle, 'password': password, + 'pinot': pinot, 'postgres': postgres, 'qds': qds, 'rabbitmq': rabbitmq, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/879e00a8/tests/contrib/hooks/test_pinot_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_pinot_hook.py b/tests/contrib/hooks/test_pinot_hook.py new file mode 100644 index 0000000..d63ee1c --- /dev/null +++ b/tests/contrib/hooks/test_pinot_hook.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import mock +import unittest + +from airflow.contrib.hooks.pinot_hook import PinotDbApiHook + + +class TestPinotDbApiHook(unittest.TestCase): + + def setUp(self): + super(TestPinotDbApiHook, self).setUp() + self.conn = conn = mock.MagicMock() + self.conn.host = 'host' + self.conn.port = '1000' + self.conn.conn_type = 'http' + self.conn.extra_dejson = {'endpoint': 'pql'} + self.cur = mock.MagicMock() + self.conn.__enter__.return_value = self.cur + self.conn.__exit__.return_value = None + + class TestPinotDBApiHook(PinotDbApiHook): + def get_conn(self): + return conn + + def get_connection(self, conn_id): + return conn + + self.db_hook = TestPinotDBApiHook + + def test_get_uri(self): + """ + Test on getting a pinot connection uri + """ + db_hook = self.db_hook() + self.assertEquals(db_hook.get_uri(), 'http://host:1000/pql') + + def test_get_conn(self): + """ + Test on getting a pinot connection + """ + conn = self.db_hook().get_conn() + self.assertEqual(conn.host, 'host') + self.assertEqual(conn.port, '1000') + self.assertEqual(conn.conn_type, 'http') + self.assertEqual(conn.extra_dejson.get('endpoint'), 'pql') + + def test_get_records(self): + statement = 'SQL' + result_sets = [('row1',), ('row2',)] + self.cur.fetchall.return_value = result_sets + self.assertEqual(result_sets, self.db_hook().get_records(statement)) + + def test_get_first(self): + statement = 'SQL' + result_sets = [('row1',), ('row2',)] + self.cur.fetchone.return_value = result_sets[0] + self.assertEqual(result_sets[0], self.db_hook().get_first(statement))