[ https://issues.apache.org/jira/browse/AIRFLOW-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anonymous reassigned AIRFLOW-2146: ---------------------------------- Assignee: (was: Kaxil Naik) > Initialize default Google BigQuery Connection with valid conn_type & Fix > broken DBApiHook > ----------------------------------------------------------------------------------------- > > Key: AIRFLOW-2146 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2146 > Project: Apache Airflow > Issue Type: Task > Components: contrib, gcp > Reporter: Kaxil Naik > Priority: Major > Fix For: 1.10.0 > > > `airflow initdb` creates a connection with conn_id='bigquery_default' and > conn_type='bigquery'. However, bigquery is not a valid conn_type, according > to models.Connection._types, and BigQuery connections should use the > google_cloud_platform conn_type. > Also as [renanleme|https://github.com/renanleme] mentioned > [here|https://github.com/apache/incubator-airflow/pull/3031#issuecomment-368132910] > the dags he has created are broken when he is using `get_records()` from > BigQueryHook which is extended from DbApiHook. > *Error Log*: > {code} > Traceback (most recent call last): > File "/src/apache-airflow/airflow/models.py", line 1519, in _run_raw_task > result = task_copy.execute(context=context) > File "/airflow/dags/lib/operators/test_operator.py", line 21, in execute > records = self._get_db_hook(self.source_conn_id).get_records(self.sql) > File "/src/apache-airflow/airflow/hooks/base_hook.py", line 92, in > get_records > raise NotImplementedError() > {code} > *Dag*: > {code:python} > from datetime import datetime > from airflow import DAG > from lib.operators.test_operator import TestOperator > default_args = { > 'depends_on_past': False, > 'start_date': datetime(2018, 2, 21), > } > dag = DAG( > 'test_dag', > default_args=default_args, > schedule_interval='0 6 * * *' > ) > sql = ''' > SELECT id from YOUR_BIGQUERY_TABLE limit 10 > ''' > compare_grouped_event = TestOperator( > task_id='test_operator', > source_conn_id='gcp_airflow', > sql=sql, > dag=dag > ) > {code} > *Operator*: > {code:python} > from airflow.hooks.base_hook import BaseHook > from airflow.models import BaseOperator > from airflow.utils.decorators import apply_defaults > class TestOperator(BaseOperator): > @apply_defaults > def __init__( > self, > sql, > source_conn_id=None, > *args, **kwargs): > super(TestOperator, self).__init__(*args, **kwargs) > self.sql = sql > self.source_conn_id = source_conn_id > def execute(self, context=None): > records = self._get_db_hook(self.source_conn_id).get_records(self.sql) > self.log.info('Fetched records from source') > @staticmethod > def _get_db_hook(conn_id): > return BaseHook.get_hook(conn_id=conn_id) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)