kaxil closed pull request #4274: [AIRFLOW-3438] Fix default values in BigQuery Hook & BigQueryOperator URL: https://github.com/apache/incubator-airflow/pull/4274
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index c7324adde4..5cab013b28 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -566,7 +566,7 @@ def run_query(self, :param labels a dictionary containing labels for the job/query, passed to BigQuery :type labels: dict - :param schema_update_options: Allows the schema of the desitination + :param schema_update_options: Allows the schema of the destination table to be updated as a side effect of the query job. :type schema_update_options: tuple :param priority: Specifies a priority for the query. @@ -582,6 +582,9 @@ def run_query(self, :type cluster_fields: list of str """ + if time_partitioning is None: + time_partitioning = {} + if not api_resource_configs: api_resource_configs = self.api_resource_configs else: diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index 7ce3102ad2..106bee8b69 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -106,13 +106,13 @@ class BigQueryOperator(BaseOperator): @apply_defaults def __init__(self, sql, - destination_dataset_table=False, + destination_dataset_table=None, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, bigquery_conn_id='bigquery_default', delegate_to=None, - udf_config=False, + udf_config=None, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, @@ -144,10 +144,8 @@ def __init__(self, self.labels = labels self.bq_cursor = None self.priority = priority - if time_partitioning is None: - self.time_partitioning = {} - if api_resource_configs is None: - self.api_resource_configs = {} + self.time_partitioning = time_partitioning + self.api_resource_configs = api_resource_configs self.cluster_fields = cluster_fields def execute(self, context): @@ -160,7 +158,7 @@ def execute(self, context): conn = hook.get_conn() self.bq_cursor = conn.cursor() self.bq_cursor.run_query( - self.sql, + sql=self.sql, destination_dataset_table=self.destination_dataset_table, write_disposition=self.write_disposition, allow_large_results=self.allow_large_results, diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py index 4e62221395..b92116a031 100644 --- a/tests/contrib/operators/test_bigquery_operator.py +++ b/tests/contrib/operators/test_bigquery_operator.py @@ -21,7 +21,8 @@ from airflow.contrib.operators.bigquery_operator import \ BigQueryCreateExternalTableOperator, BigQueryCreateEmptyTableOperator, \ - BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator + BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator, \ + BigQueryOperator try: from unittest import mock @@ -143,3 +144,84 @@ def test_execute(self, mock_hook): project_id=TEST_PROJECT_ID, dataset_reference={} ) + + +class BigQueryOperatorTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook') + def test_execute(self, mock_hook): + operator = BigQueryOperator( + task_id=TASK_ID, + sql='Select * from test_table', + destination_dataset_table=None, + write_disposition='WRITE_EMPTY', + allow_large_results=False, + flatten_results=None, + bigquery_conn_id='bigquery_default', + udf_config=None, + use_legacy_sql=True, + maximum_billing_tier=None, + maximum_bytes_billed=None, + create_disposition='CREATE_IF_NEEDED', + schema_update_options=(), + query_params=None, + labels=None, + priority='INTERACTIVE', + time_partitioning=None, + api_resource_configs=None, + cluster_fields=None, + ) + + operator.execute(None) + mock_hook.return_value \ + .get_conn() \ + .cursor() \ + .run_query \ + .assert_called_once_with( + sql='Select * from test_table', + destination_dataset_table=None, + write_disposition='WRITE_EMPTY', + allow_large_results=False, + flatten_results=None, + udf_config=None, + maximum_billing_tier=None, + maximum_bytes_billed=None, + create_disposition='CREATE_IF_NEEDED', + schema_update_options=(), + query_params=None, + labels=None, + priority='INTERACTIVE', + time_partitioning=None, + api_resource_configs=None, + cluster_fields=None, + ) + + @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook') + def test_bigquery_operator_defaults(self, mock_hook): + operator = BigQueryOperator( + task_id=TASK_ID, + sql='Select * from test_table', + ) + + operator.execute(None) + mock_hook.return_value \ + .get_conn() \ + .cursor() \ + .run_query \ + .assert_called_once_with( + sql='Select * from test_table', + destination_dataset_table=None, + write_disposition='WRITE_EMPTY', + allow_large_results=False, + flatten_results=None, + udf_config=None, + maximum_billing_tier=None, + maximum_bytes_billed=None, + create_disposition='CREATE_IF_NEEDED', + schema_update_options=(), + query_params=None, + labels=None, + priority='INTERACTIVE', + time_partitioning=None, + api_resource_configs=None, + cluster_fields=None, + ) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services