Repository: incubator-airflow Updated Branches: refs/heads/master f1ac67bdc -> 3bdb34e77
[AIRFLOW-2472] Implement MySqlHook.bulk_dump Implement MySqlHook.bulk_dump since the opposite operation bulk_load is already implemented. This PR also addresses some flake8 warnings. Closes #3385 from sekikn/AIRFLOW-2472 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3bdb34e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3bdb34e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3bdb34e7 Branch: refs/heads/master Commit: 3bdb34e7775fdb85e3dd6cd92a47f91c80b1f67b Parents: f1ac67b Author: Kengo Seki <sek...@apache.org> Authored: Wed May 23 13:58:04 2018 -0700 Committer: r39132 <siddharthan...@yahoo.com> Committed: Wed May 23 13:58:04 2018 -0700 ---------------------------------------------------------------------- airflow/hooks/mysql_hook.py | 12 ++++ airflow/utils/tests.py | 8 +++ tests/hooks/test_hive_hook.py | 9 +-- tests/operators/operators.py | 75 ++++++++++++++------ tests/operators/test_redshift_to_s3_operator.py | 11 +-- tests/operators/test_s3_to_redshift_operator.py | 8 +-- 6 files changed, 79 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/airflow/hooks/mysql_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/mysql_hook.py b/airflow/hooks/mysql_hook.py index d202e55..f52b60c 100644 --- a/airflow/hooks/mysql_hook.py +++ b/airflow/hooks/mysql_hook.py @@ -89,6 +89,18 @@ class MySqlHook(DbApiHook): """.format(**locals())) conn.commit() + def bulk_dump(self, table, tmp_file): + """ + Dumps a database table into a tab-delimited file + """ + conn = self.get_conn() + cur = conn.cursor() + cur.execute(""" + SELECT * INTO OUTFILE '{tmp_file}' + FROM {table} + """.format(**locals())) + conn.commit() + @staticmethod def _serialize_cell(cell, conn): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/airflow/utils/tests.py ---------------------------------------------------------------------- diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py index 9982a07..6f29ffc 100644 --- a/airflow/utils/tests.py +++ b/airflow/utils/tests.py @@ -17,8 +17,10 @@ # specific language governing permissions and limitations # under the License. +import re import unittest + def skipUnlessImported(module, obj): import importlib try: @@ -29,3 +31,9 @@ def skipUnlessImported(module, obj): obj in dir(m), "Skipping test because {} could not be imported from {}".format( obj, module)) + + +def assertEqualIgnoreMultipleSpaces(case, first, second, msg=None): + def _trim(s): + re.sub("\s+", " ", s.strip()) + return case.assertEqual(_trim(first), _trim(second), msg) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/hooks/test_hive_hook.py ---------------------------------------------------------------------- diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py index d090086..690b5ad 100644 --- a/tests/hooks/test_hive_hook.py +++ b/tests/hooks/test_hive_hook.py @@ -22,7 +22,6 @@ import datetime import itertools import pandas as pd import random -import re import mock import unittest @@ -34,6 +33,7 @@ from airflow.exceptions import AirflowException from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook from airflow import DAG, configuration, operators from airflow.utils import timezone +from airflow.utils.tests import assertEqualIgnoreMultipleSpaces configuration.load_test_config() @@ -188,12 +188,7 @@ class TestHiveCliHook(unittest.TestCase): STORED AS textfile ; """ - - def _trim(s): - return re.sub("\s+", " ", s.strip()) - - self.assertEqual(_trim(mock_run_cli.call_args_list[0][0][0]), - _trim(query)) + assertEqualIgnoreMultipleSpaces(self, mock_run_cli.call_args_list[0][0][0], query) class TestHiveMetastoreHook(HiveEnvironmentTest): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/operators/operators.py ---------------------------------------------------------------------- diff --git a/tests/operators/operators.py b/tests/operators/operators.py index 9fc2c93..f795fd0 100644 --- a/tests/operators/operators.py +++ b/tests/operators/operators.py @@ -23,10 +23,12 @@ from airflow import DAG, configuration, operators from airflow.utils.tests import skipUnlessImported from airflow.utils import timezone -configuration.load_test_config() - +import os +import mock import unittest +configuration.load_test_config() + DEFAULT_DATE = timezone.datetime(2015, 1, 1) DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] @@ -51,8 +53,8 @@ class MySqlTest(unittest.TestCase): dummy VARCHAR(50) ); """ - import airflow.operators.mysql_operator - t = operators.mysql_operator.MySqlOperator( + from airflow.operators.mysql_operator import MySqlOperator + t = MySqlOperator( task_id='basic_mysql', sql=sql, mysql_conn_id='airflow_db', @@ -64,8 +66,8 @@ class MySqlTest(unittest.TestCase): "TRUNCATE TABLE test_airflow", "INSERT INTO test_airflow VALUES ('X')", ] - import airflow.operators.mysql_operator - t = operators.mysql_operator.MySqlOperator( + from airflow.operators.mysql_operator import MySqlOperator + t = MySqlOperator( task_id='mysql_operator_test_multi', mysql_conn_id='airflow_db', sql=sql, dag=self.dag) @@ -93,10 +95,40 @@ class MySqlTest(unittest.TestCase): results = tuple(result[0] for result in c.fetchall()) self.assertEqual(sorted(results), sorted(records)) + def test_mysql_hook_test_bulk_dump(self): + from airflow.hooks.mysql_hook import MySqlHook + hook = MySqlHook('airflow_ci') + priv = hook.get_first("SELECT @@global.secure_file_priv") + if priv and priv[0]: + # Confirm that no error occurs + hook.bulk_dump("INFORMATION_SCHEMA.TABLES", os.path.join(priv[0], "TABLES")) + else: + self.skipTest("Skip test_mysql_hook_test_bulk_load " + "since file output is not permitted") + + @mock.patch('airflow.hooks.mysql_hook.MySqlHook.get_conn') + def test_mysql_hook_test_bulk_dump_mock(self, mock_get_conn): + mock_execute = mock.MagicMock() + mock_get_conn.return_value.cursor.return_value.execute = mock_execute + + from airflow.hooks.mysql_hook import MySqlHook + hook = MySqlHook('airflow_ci') + table = "INFORMATION_SCHEMA.TABLES" + tmp_file = "/path/to/output/file" + hook.bulk_dump(table, tmp_file) + + from airflow.utils.tests import assertEqualIgnoreMultipleSpaces + mock_execute.assert_called_once() + query = """ + SELECT * INTO OUTFILE '{tmp_file}' + FROM {table} + """.format(tmp_file=tmp_file, table=table) + assertEqualIgnoreMultipleSpaces(self, mock_execute.call_args[0][0], query) + def test_mysql_to_mysql(self): sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" - import airflow.operators.generic_transfer - t = operators.generic_transfer.GenericTransfer( + from airflow.operators.generic_transfer import GenericTransfer + t = GenericTransfer( task_id='test_m2m', preoperator=[ "DROP TABLE IF EXISTS test_mysql_to_mysql", @@ -114,10 +146,10 @@ class MySqlTest(unittest.TestCase): """ Verifies option to overwrite connection schema """ - import airflow.operators.mysql_operator + from airflow.operators.mysql_operator import MySqlOperator sql = "SELECT 1;" - t = operators.mysql_operator.MySqlOperator( + t = MySqlOperator( task_id='test_mysql_operator_test_schema_overwrite', sql=sql, dag=self.dag, @@ -146,9 +178,8 @@ class PostgresTest(unittest.TestCase): dummy VARCHAR(50) ); """ - import airflow.operators.postgres_operator - t = operators.postgres_operator.PostgresOperator( - task_id='basic_postgres', sql=sql, dag=self.dag) + from airflow.operators.postgres_operator import PostgresOperator + t = PostgresOperator(task_id='basic_postgres', sql=sql, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) autocommitTask = operators.postgres_operator.PostgresOperator( @@ -166,15 +197,15 @@ class PostgresTest(unittest.TestCase): "TRUNCATE TABLE test_airflow", "INSERT INTO test_airflow VALUES ('X')", ] - import airflow.operators.postgres_operator - t = operators.postgres_operator.PostgresOperator( + from airflow.operators.postgres_operator import PostgresOperator + t = PostgresOperator( task_id='postgres_operator_test_multi', sql=sql, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_postgres_to_postgres(self): sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" - import airflow.operators.generic_transfer - t = operators.generic_transfer.GenericTransfer( + from airflow.operators.generic_transfer import GenericTransfer + t = GenericTransfer( task_id='test_p2p', preoperator=[ "DROP TABLE IF EXISTS test_postgres_to_postgres", @@ -192,10 +223,10 @@ class PostgresTest(unittest.TestCase): """ Verifies the VACUUM operation runs well with the PostgresOperator """ - import airflow.operators.postgres_operator + from airflow.operators.postgres_operator import PostgresOperator sql = "VACUUM ANALYZE;" - t = operators.postgres_operator.PostgresOperator( + t = PostgresOperator( task_id='postgres_operator_test_vacuum', sql=sql, dag=self.dag, @@ -206,10 +237,10 @@ class PostgresTest(unittest.TestCase): """ Verifies option to overwrite connection schema """ - import airflow.operators.postgres_operator + from airflow.operators.postgres_operator import PostgresOperator sql = "SELECT 1;" - t = operators.postgres_operator.PostgresOperator( + t = PostgresOperator( task_id='postgres_operator_test_schema_overwrite', sql=sql, dag=self.dag, @@ -242,7 +273,6 @@ class TransferTests(unittest.TestCase): end_date=timezone.utcnow()) def test_mysql_to_hive(self): - # import airflow.operators from airflow.operators.mysql_to_hive import MySqlToHiveTransfer sql = "SELECT * FROM baby_names LIMIT 1000;" t = MySqlToHiveTransfer( @@ -273,7 +303,6 @@ class TransferTests(unittest.TestCase): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_mysql_to_hive_tblproperties(self): - # import airflow.operators from airflow.operators.mysql_to_hive import MySqlToHiveTransfer sql = "SELECT * FROM baby_names LIMIT 1000;" t = MySqlToHiveTransfer( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/operators/test_redshift_to_s3_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/test_redshift_to_s3_operator.py b/tests/operators/test_redshift_to_s3_operator.py index e214f3d..379e787 100644 --- a/tests/operators/test_redshift_to_s3_operator.py +++ b/tests/operators/test_redshift_to_s3_operator.py @@ -19,11 +19,11 @@ # import mock -import re import unittest from boto3.session import Session from airflow.operators.redshift_to_s3_operator import RedshiftToS3Transfer +from airflow.utils.tests import assertEqualIgnoreMultipleSpaces class TestRedshiftToS3Transfer(unittest.TestCase): @@ -92,13 +92,8 @@ class TestRedshiftToS3Transfer(unittest.TestCase): secret_key=secret_key, unload_options=unload_options) - def _trim(s): - return re.sub("\s+", " ", s.strip()) - - self.assertEqual(_trim(cur.execute.call_args[0][0]), - _trim(columns_query)) cur.execute.assert_called_once() + assertEqualIgnoreMultipleSpaces(self, cur.execute.call_args[0][0], columns_query) - self.assertEqual(_trim(mock_run.call_args[0][0]), - _trim(unload_query)) mock_run.assert_called_once() + assertEqualIgnoreMultipleSpaces(self, mock_run.call_args[0][0], unload_query) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/operators/test_s3_to_redshift_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/test_s3_to_redshift_operator.py b/tests/operators/test_s3_to_redshift_operator.py index 0fadda3..2afde6c 100644 --- a/tests/operators/test_s3_to_redshift_operator.py +++ b/tests/operators/test_s3_to_redshift_operator.py @@ -19,11 +19,11 @@ # import mock -import re import unittest from boto3.session import Session from airflow.operators.s3_to_redshift_operator import S3ToRedshiftTransfer +from airflow.utils.tests import assertEqualIgnoreMultipleSpaces class TestS3ToRedshiftTransfer(unittest.TestCase): @@ -67,9 +67,5 @@ class TestS3ToRedshiftTransfer(unittest.TestCase): secret_key=secret_key, copy_options=copy_options) - def _trim(s): - return re.sub("\s+", " ", s.strip()) - - self.assertEqual(_trim(mock_run.call_args[0][0]), - _trim(copy_query)) mock_run.assert_called_once() + assertEqualIgnoreMultipleSpaces(self, mock_run.call_args[0][0], copy_query)