This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 42334b0f9145e70eb23938cad78589f6a97731dc Author: dzamo <9107319+dz...@users.noreply.github.com> AuthorDate: Mon Jul 12 19:59:35 2021 +0200 AIRFLOW-5529 Add Apache Drill provider. (#16884) (cherry picked from commit 8808b641942e1b81c21db054fd6d36e2031cfab8) --- CONTRIBUTING.rst | 22 ++--- INSTALL | 24 +++--- airflow/providers/apache/drill/CHANGELOG.rst | 25 ++++++ airflow/providers/apache/drill/__init__.py | 17 ++++ .../apache/drill/example_dags/example_drill_dag.py | 46 +++++++++++ airflow/providers/apache/drill/hooks/__init__.py | 17 ++++ airflow/providers/apache/drill/hooks/drill.py | 89 +++++++++++++++++++++ .../providers/apache/drill/operators/__init__.py | 17 ++++ airflow/providers/apache/drill/operators/drill.py | 71 ++++++++++++++++ airflow/providers/apache/drill/provider.yaml | 49 ++++++++++++ airflow/ui/src/views/Docs.tsx | 1 + airflow/utils/db.py | 10 +++ .../commits.rst | 23 ++++++ .../connections/drill.rst | 44 ++++++++++ .../index.rst | 50 ++++++++++++ .../operators.rst | 51 ++++++++++++ docs/apache-airflow/extra-packages-ref.rst | 2 + docs/conf.py | 1 + docs/integration-logos/apache/drill.png | Bin 0 -> 40173 bytes docs/spelling_wordlist.txt | 1 + setup.py | 3 + tests/providers/apache/drill/__init__.py | 17 ++++ tests/providers/apache/drill/hooks/__init__.py | 17 ++++ tests/providers/apache/drill/hooks/test_drill.py | 84 +++++++++++++++++++ tests/providers/apache/drill/operators/__init__.py | 17 ++++ .../providers/apache/drill/operators/test_drill.py | 63 +++++++++++++++ 26 files changed, 738 insertions(+), 23 deletions(-) diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 98cdf93..be807f4 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -573,17 +573,17 @@ This is the full list of those extras: .. START EXTRAS HERE -airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, -apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, -apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, -cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel, devel_all, devel_ci, -devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, -gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, -jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, -microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, -password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, -segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, -telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk +airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.drill, +apache.druid, apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, +apache.spark, apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery, +cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel, +devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, +facebook, ftp, gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, +http, imap, jdbc, jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure, +microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, +pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, +salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, +ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk .. END EXTRAS HERE diff --git a/INSTALL b/INSTALL index 111b51f..554af5c 100644 --- a/INSTALL +++ b/INSTALL @@ -1,6 +1,6 @@ # INSTALL / BUILD instructions for Apache Airflow -This ia a generic installation method that requires a number of dependencies to be installed. +This is a generic installation method that requires a number of dependencies to be installed. Depending on your system you might need different prerequisites, but the following systems/prerequisites are known to work: @@ -89,17 +89,17 @@ The list of available extras: # START EXTRAS HERE -airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, -apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, -apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, -cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel, devel_all, devel_ci, -devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, -gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, -jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, -microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, -password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, -segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, -telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk +airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.drill, +apache.druid, apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, +apache.spark, apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery, +cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel, +devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, +facebook, ftp, gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, +http, imap, jdbc, jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure, +microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, +pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, +salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, +ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk # END EXTRAS HERE diff --git a/airflow/providers/apache/drill/CHANGELOG.rst b/airflow/providers/apache/drill/CHANGELOG.rst new file mode 100644 index 0000000..cef7dda --- /dev/null +++ b/airflow/providers/apache/drill/CHANGELOG.rst @@ -0,0 +1,25 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Changelog +--------- + +1.0.0 +..... + +Initial version of the provider. diff --git a/airflow/providers/apache/drill/__init__.py b/airflow/providers/apache/drill/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/airflow/providers/apache/drill/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/apache/drill/example_dags/example_drill_dag.py b/airflow/providers/apache/drill/example_dags/example_drill_dag.py new file mode 100644 index 0000000..60a35ee --- /dev/null +++ b/airflow/providers/apache/drill/example_dags/example_drill_dag.py @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG to submit Apache Spark applications using +`SparkSubmitOperator`, `SparkJDBCOperator` and `SparkSqlOperator`. +""" +from airflow.models import DAG +from airflow.providers.apache.drill.operators.drill import DrillOperator +from airflow.utils.dates import days_ago + +args = { + 'owner': 'Airflow', +} + +with DAG( + dag_id='example_drill_dag', + default_args=args, + schedule_interval=None, + start_date=days_ago(2), + tags=['example'], +) as dag: + # [START howto_operator_drill] + sql_task = DrillOperator( + task_id='json_to_parquet_table', + sql=''' + drop table if exists dfs.tmp.employee; + create table dfs.tmp.employee as select * from cp.`employee.json`; + ''', + ) + # [END howto_operator_drill] diff --git a/airflow/providers/apache/drill/hooks/__init__.py b/airflow/providers/apache/drill/hooks/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/airflow/providers/apache/drill/hooks/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/apache/drill/hooks/drill.py b/airflow/providers/apache/drill/hooks/drill.py new file mode 100644 index 0000000..470be8c --- /dev/null +++ b/airflow/providers/apache/drill/hooks/drill.py @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Iterable, Optional, Tuple + +from sqlalchemy import create_engine +from sqlalchemy.engine import Connection + +from airflow.hooks.dbapi import DbApiHook + + +class DrillHook(DbApiHook): + """ + Interact with Apache Drill via sqlalchemy-drill. + + You can specify the SQLAlchemy dialect and driver that sqlalchemy-drill + will employ to communicate with Drill in the extras field of your + connection, e.g. ``{"dialect_driver": "drill+sadrill"}`` for communication + over Drill's REST API. See the sqlalchemy-drill documentation for + descriptions of the supported dialects and drivers. + + You can specify the default storage_plugin for the sqlalchemy-drill + connection using the extras field e.g. ``{"storage_plugin": "dfs"}``. + """ + + conn_name_attr = 'drill_conn_id' + default_conn_name = 'drill_default' + conn_type = 'drill' + hook_name = 'Drill' + supports_autocommit = False + + def get_conn(self) -> Connection: + """Establish a connection to Drillbit.""" + conn_md = self.get_connection(getattr(self, self.conn_name_attr)) + creds = f'{conn_md.login}:{conn_md.password}@' if conn_md.login else '' + engine = create_engine( + f'{conn_md.extra_dejson.get("dialect_driver", "drill+sadrill")}://{creds}' + f'{conn_md.host}:{conn_md.port}/' + f'{conn_md.extra_dejson.get("storage_plugin", "dfs")}' + ) + + self.log.info( + 'Connected to the Drillbit at %s:%s as user %s', conn_md.host, conn_md.port, conn_md.login + ) + return engine.raw_connection() + + def get_uri(self) -> str: + """ + Returns the connection URI + + e.g: ``drill://localhost:8047/dfs`` + """ + conn_md = self.get_connection(getattr(self, self.conn_name_attr)) + host = conn_md.host + if conn_md.port is not None: + host += f':{conn_md.port}' + conn_type = 'drill' if not conn_md.conn_type else conn_md.conn_type + dialect_driver = conn_md.extra_dejson.get('dialect_driver', 'drill+sadrill') + storage_plugin = conn_md.extra_dejson.get('storage_plugin', 'dfs') + return f'{conn_type}://{host}/{storage_plugin}' f'?dialect_driver={dialect_driver}' + + def set_autocommit(self, conn: Connection, autocommit: bool) -> NotImplemented: + raise NotImplementedError("There are no transactions in Drill.") + + def insert_rows( + self, + table: str, + rows: Iterable[Tuple[str]], + target_fields: Optional[Iterable[str]] = None, + commit_every: int = 1000, + replace: bool = False, + **kwargs: Any, + ) -> NotImplemented: + raise NotImplementedError("There is no INSERT statement in Drill.") diff --git a/airflow/providers/apache/drill/operators/__init__.py b/airflow/providers/apache/drill/operators/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/airflow/providers/apache/drill/operators/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/apache/drill/operators/drill.py b/airflow/providers/apache/drill/operators/drill.py new file mode 100644 index 0000000..459c623 --- /dev/null +++ b/airflow/providers/apache/drill/operators/drill.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Iterable, Mapping, Optional, Union + +import sqlparse + +from airflow.models import BaseOperator +from airflow.providers.apache.drill.hooks.drill import DrillHook +from airflow.utils.decorators import apply_defaults + + +class DrillOperator(BaseOperator): + """ + Executes the provided SQL in the identified Drill environment. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DrillOperator` + + :param sql: the SQL code to be executed. (templated) + :type sql: Can receive a str representing a sql statement, + a list of str (sql statements), or a reference to a template file. + Template references are recognized by str ending in '.sql' + :param drill_conn_id: id of the connection config for the target Drill + environment + :type drill_conn_id: str + :param parameters: (optional) the parameters to render the SQL query with. + :type parameters: dict or iterable + """ + + template_fields = ('sql',) + template_fields_renderers = {'sql': 'sql'} + template_ext = ('.sql',) + ui_color = '#ededed' + + @apply_defaults + def __init__( + self, + *, + sql: str, + drill_conn_id: str = 'drill_default', + parameters: Optional[Union[Mapping, Iterable]] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.sql = sql + self.drill_conn_id = drill_conn_id + self.parameters = parameters + self.hook = None + + def execute(self, context): + self.log.info('Executing: %s on %s', self.sql, self.drill_conn_id) + self.hook = DrillHook(drill_conn_id=self.drill_conn_id) + sql = sqlparse.split(sqlparse.format(self.sql, strip_comments=True)) + no_term_sql = [s[:-1] for s in sql if s[-1] == ';'] + self.hook.run(no_term_sql, parameters=self.parameters) diff --git a/airflow/providers/apache/drill/provider.yaml b/airflow/providers/apache/drill/provider.yaml new file mode 100644 index 0000000..88021e6 --- /dev/null +++ b/airflow/providers/apache/drill/provider.yaml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-apache-drill +name: Apache Drill +description: | + `Apache Drill <https://drill.apache.org/>`__. + +versions: + - 1.0.0 + +additional-dependencies: + - apache-airflow>=2.1.0 + +integrations: + - integration-name: Apache Drill + external-doc-url: https://drill.apache.org/ + how-to-guide: + - /docs/apache-airflow-providers-apache-drill/operators.rst + logo: /integration-logos/apache/drill.png + tags: [apache] + +operators: + - integration-name: Apache Drill + python-modules: + - airflow.providers.apache.drill.operators.drill + +hooks: + - integration-name: Apache Drill + python-modules: + - airflow.providers.apache.drill.hooks.drill + +hook-class-names: + - airflow.providers.apache.drill.hooks.drill.DrillHook diff --git a/airflow/ui/src/views/Docs.tsx b/airflow/ui/src/views/Docs.tsx index 754b803..ea42ca6 100644 --- a/airflow/ui/src/views/Docs.tsx +++ b/airflow/ui/src/views/Docs.tsx @@ -41,6 +41,7 @@ const Docs: React.FC = () => { { path: 'amazon', name: 'Amazon' }, { path: 'apache-beam', name: 'Apache Beam' }, { path: 'apache-cassandra', name: 'Apache Cassandra' }, + { path: 'apache-drill', name: 'Apache Drill' }, { path: 'apache-druid', name: 'Apache Druid' }, { path: 'apache-hdfs', name: 'Apache HDFS' }, { path: 'apache-hive', name: 'Apache Hive' }, diff --git a/airflow/utils/db.py b/airflow/utils/db.py index ae8dc0e..69e2e00 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -168,6 +168,16 @@ def create_default_connections(session=None): ) merge_conn( Connection( + conn_id="drill_default", + conn_type="drill", + host="localhost", + port=8047, + extra='{"dialect_driver": "drill+sadrill", "storage_plugin": "dfs"}', + ), + session, + ) + merge_conn( + Connection( conn_id="druid_broker_default", conn_type="druid", host="druid-broker", diff --git a/docs/apache-airflow-providers-apache-drill/commits.rst b/docs/apache-airflow-providers-apache-drill/commits.rst new file mode 100644 index 0000000..deb31b3 --- /dev/null +++ b/docs/apache-airflow-providers-apache-drill/commits.rst @@ -0,0 +1,23 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Package apache-airflow-providers-apache-drill +------------------------------------------------------ + +`Apache Drill <https://drill.apache.org/>`__. diff --git a/docs/apache-airflow-providers-apache-drill/connections/drill.rst b/docs/apache-airflow-providers-apache-drill/connections/drill.rst new file mode 100644 index 0000000..05e00a2 --- /dev/null +++ b/docs/apache-airflow-providers-apache-drill/connections/drill.rst @@ -0,0 +1,44 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/connection:drill: + +Apache Drill Connection +======================= + +The Apache Drill connection type configures a connection to Apache Drill via the sqlalchemy-drill Python package. + +Default Connection IDs +---------------------- + +Drill hooks and operators use ``drill_default`` by default. + +Configuring the Connection +-------------------------- +Host (required) + The host of the Drillbit to connect to (HTTP, JDBC) or the DSN of the Drill ODBC connection. + +Port (optional) + The port of the Drillbit to connect to. + +Extra (optional) + A JSON dictionary specifying the extra parameters that can be used in sqlalchemy-drill connection. + + * ``dialect_driver`` - The dialect and driver as understood by sqlalchemy-drill. Defaults to ``drill_sadrill`` (HTTP). + * ``storage_plugin`` - The default Drill storage plugin for this connection. Defaults to ``dfs``. diff --git a/docs/apache-airflow-providers-apache-drill/index.rst b/docs/apache-airflow-providers-apache-drill/index.rst new file mode 100644 index 0000000..9ef0f2e --- /dev/null +++ b/docs/apache-airflow-providers-apache-drill/index.rst @@ -0,0 +1,50 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-apache-drill`` +========================================= + +Content +------- + +.. toctree:: + :maxdepth: 1 + :caption: Guides + + Connection types <connections/drill> + Operators <operators> + +.. toctree:: + :maxdepth: 1 + :caption: References + + Python API <_api/airflow/providers/apache/drill/index> + +.. toctree:: + :maxdepth: 1 + :caption: Resources + + Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/apache/drill/example_dags> + PyPI Repository <https://pypi.org/project/apache-airflow-providers-apache-drill/> + +.. THE REMINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME! + +.. toctree:: + :maxdepth: 1 + :caption: Commits + + Detailed list of commits <commits> diff --git a/docs/apache-airflow-providers-apache-drill/operators.rst b/docs/apache-airflow-providers-apache-drill/operators.rst new file mode 100644 index 0000000..f2d5cf5 --- /dev/null +++ b/docs/apache-airflow-providers-apache-drill/operators.rst @@ -0,0 +1,51 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Apache Drill Operators +====================== + +.. contents:: + :depth: 1 + :local: + +Prerequisite +------------ + +To use ``DrillOperator``, you must configure a :doc:`Drill Connection <connections/drill>`. + + +.. _howto/operator:DrillOperator: + +DrillOperator +------------- + +Executes one or more SQL queries on an Apache Drill server. The ``sql`` parameter can be templated and be an external ``.sql`` file. + +Using the operator +"""""""""""""""""" + +.. exampleinclude:: /../../airflow/providers/apache/drill/example_dags/example_drill_dag.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_drill] + :end-before: [END howto_operator_drill] + +Reference +""""""""" + +For further information, see `the Drill documentation on querying data <http://apache.github.io/drill/docs/query-data/>`_. diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index b1dff07..2d4769e 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -116,6 +116,8 @@ custom bash/python providers). +---------------------+-----------------------------------------------------+------------------------------------------------+ | apache.cassandra | ``pip install 'apache-airflow[apache.cassandra]'`` | Cassandra related operators & hooks | +---------------------+-----------------------------------------------------+------------------------------------------------+ +| apache.drill | ``pip install 'apache-airflow[apache.drill]'`` | Drill related operators & hooks | ++---------------------+-----------------------------------------------------+------------------------------------------------+ | apache.druid | ``pip install 'apache-airflow[apache.druid]'`` | Druid related operators & hooks | +---------------------+-----------------------------------------------------+------------------------------------------------+ | apache.hdfs | ``pip install 'apache-airflow[apache.hdfs]'`` | HDFS hooks and operators | diff --git a/docs/conf.py b/docs/conf.py index 3046303..da3b8e2 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -512,6 +512,7 @@ autodoc_mock_imports = [ 'slack_sdk', 'smbclient', 'snowflake', + 'sqlalchemy-drill', 'sshtunnel', 'telegram', 'tenacity', diff --git a/docs/integration-logos/apache/drill.png b/docs/integration-logos/apache/drill.png new file mode 100644 index 0000000..9f76b61 Binary files /dev/null and b/docs/integration-logos/apache/drill.png differ diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index c476512..7631c6c 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -120,6 +120,7 @@ Docstring Docstrings Dont Driesprong +Drillbit Drivy Dsn Dynamodb diff --git a/setup.py b/setup.py index 9e8c28d..9dde824 100644 --- a/setup.py +++ b/setup.py @@ -255,6 +255,7 @@ doc = [ docker = [ 'docker', ] +drill = ['sqlalchemy-drill>=1.1.0', 'sqlparse>=0.4.1'] druid = [ 'pydruid>=0.4.1', ] @@ -534,6 +535,7 @@ PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = { 'amazon': amazon, 'apache.beam': apache_beam, 'apache.cassandra': cassandra, + 'apache.drill': drill, 'apache.druid': druid, 'apache.hdfs': hdfs, 'apache.hive': hive, @@ -724,6 +726,7 @@ ALL_PROVIDERS = list(PROVIDERS_REQUIREMENTS.keys()) ALL_DB_PROVIDERS = [ 'apache.cassandra', + 'apache.drill', 'apache.druid', 'apache.hdfs', 'apache.hive', diff --git a/tests/providers/apache/drill/__init__.py b/tests/providers/apache/drill/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/tests/providers/apache/drill/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/apache/drill/hooks/__init__.py b/tests/providers/apache/drill/hooks/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/tests/providers/apache/drill/hooks/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/apache/drill/hooks/test_drill.py b/tests/providers/apache/drill/hooks/test_drill.py new file mode 100644 index 0000000..97ed71f --- /dev/null +++ b/tests/providers/apache/drill/hooks/test_drill.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +from unittest.mock import MagicMock + +from airflow.providers.apache.drill.hooks.drill import DrillHook + + +class TestDrillHook(unittest.TestCase): + def setUp(self): + self.cur = MagicMock(rowcount=0) + self.conn = conn = MagicMock() + self.conn.login = 'drill_user' + self.conn.password = 'secret' + self.conn.host = 'host' + self.conn.port = '8047' + self.conn.conn_type = 'drill' + self.conn.extra_dejson = {'dialect_driver': 'drill+sadrill', 'storage_plugin': 'dfs'} + self.conn.cursor.return_value = self.cur + + class TestDrillHook(DrillHook): + def get_conn(self): + return conn + + def get_connection(self, conn_id): + return conn + + self.db_hook = TestDrillHook + + def test_get_uri(self): + db_hook = self.db_hook() + assert 'drill://host:8047/dfs?dialect_driver=drill+sadrill' == db_hook.get_uri() + + def test_get_first_record(self): + statement = 'SQL' + result_sets = [('row1',), ('row2',)] + self.cur.fetchone.return_value = result_sets[0] + + assert result_sets[0] == self.db_hook().get_first(statement) + assert self.conn.close.call_count == 1 + assert self.cur.close.call_count == 1 + self.cur.execute.assert_called_once_with(statement) + + def test_get_records(self): + statement = 'SQL' + result_sets = [('row1',), ('row2',)] + self.cur.fetchall.return_value = result_sets + + assert result_sets == self.db_hook().get_records(statement) + assert self.conn.close.call_count == 1 + assert self.cur.close.call_count == 1 + self.cur.execute.assert_called_once_with(statement) + + def test_get_pandas_df(self): + statement = 'SQL' + column = 'col' + result_sets = [('row1',), ('row2',)] + self.cur.description = [(column,)] + self.cur.fetchall.return_value = result_sets + df = self.db_hook().get_pandas_df(statement) + + assert column == df.columns[0] + for i in range(len(result_sets)): # pylint: disable=consider-using-enumerate + assert result_sets[i][0] == df.values.tolist()[i][0] + assert self.conn.close.call_count == 1 + assert self.cur.close.call_count == 1 + self.cur.execute.assert_called_once_with(statement) diff --git a/tests/providers/apache/drill/operators/__init__.py b/tests/providers/apache/drill/operators/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/tests/providers/apache/drill/operators/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/apache/drill/operators/test_drill.py b/tests/providers/apache/drill/operators/test_drill.py new file mode 100644 index 0000000..3572d85 --- /dev/null +++ b/tests/providers/apache/drill/operators/test_drill.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +import pytest + +from airflow.models.dag import DAG +from airflow.providers.apache.drill.operators.drill import DrillOperator +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2015, 1, 1) +DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() +DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] +TEST_DAG_ID = 'unit_test_dag' + + +@pytest.mark.backend("drill") +class TestDrillOperator(unittest.TestCase): + def setUp(self): + args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} + dag = DAG(TEST_DAG_ID, default_args=args) + self.dag = dag + + def tearDown(self): + tables_to_drop = ['dfs.tmp.test_airflow'] + from airflow.providers.apache.drill.hooks.drill import DrillHook + + with DrillHook().get_conn() as conn: + with conn.cursor() as cur: + for table in tables_to_drop: + cur.execute(f"DROP TABLE IF EXISTS {table}") + + def test_drill_operator_single(self): + sql = """ + create table dfs.tmp.test_airflow as + select * from cp.`employee.json` limit 10 + """ + op = DrillOperator(task_id='drill_operator_test_single', sql=sql, dag=self.dag) + op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + def test_drill_operator_multi(self): + sql = [ + "create table dfs.tmp.test_airflow as" "select * from cp.`employee.json` limit 10", + "select sum(employee_id), any_value(full_name)" "from dfs.tmp.test_airflow", + ] + op = DrillOperator(task_id='drill_operator_test_multi', sql=sql, dag=self.dag) + op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)