lawrencestfs opened a new issue #17800:
URL: https://github.com/apache/airflow/issues/17800


   
   **Apache Airflow version**: 1.10.15
   
   **OS**: Linux 5.4.109+
   
   **Apache Airflow Provider versions**:
   apache-airflow-backport-providers-apache-beam==2021.3.13
   apache-airflow-backport-providers-cncf-kubernetes==2021.3.3
   apache-airflow-backport-providers-google==2021.3.3
   
   **Deployment**: Cloud Composer 1.16.6 (Google Cloud Managed Airflow Service)
   
   **What happened**:
   
   BigQueryCreateExternalTableOperator from the providers package 
([airflow.providers.google.cloud.operators.bigquery](https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/bigquery.py))
 fails with correct _schema_object_ parameter.
   
   **What you expected to happen**:
   
   I expected the DAG to succesfully run, as I've previously tested it with the 
deprecated operator from the contrib package 
([airflow.contrib.operators.bigquery_operator](https://github.com/apache/airflow/blob/5786dcdc392f7a2649f398353a0beebef01c428e/airflow/contrib/operators/bigquery_operator.py#L476)),
 using the same parameters.
   
   Debbuging the DAG execution log, I saw the providers operator generated a 
wrong call to the Cloud Storage API: it mixed up the bucket and object 
parameters, according the stack trace bellow.
   
   ```
   [2021-08-23 23:17:22,316] {taskinstance.py:1152} ERROR - 404 GET 
https://storage.googleapis.com/download/storage/v1/b/foo/bar/schema.json/o/mybucket?alt=media:
 Not Found: ('Request failed with status code', 404, 'Expected one of', 
<HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)
   Traceback (most recent call last)
     File 
"/opt/python3.6/lib/python3.6/site-packages/google/cloud/storage/client.py", 
line 728, in download_blob_to_fil
       checksum=checksum
     File 
"/opt/python3.6/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 
956, in _do_downloa
       response = download.consume(transport, timeout=timeout
     File 
"/opt/python3.6/lib/python3.6/site-packages/google/resumable_media/requests/download.py",
 line 168, in consum
       self._process_response(result
     File 
"/opt/python3.6/lib/python3.6/site-packages/google/resumable_media/_download.py",
 line 186, in _process_respons
       response, _ACCEPTABLE_STATUS_CODES, self._get_status_cod
     File 
"/opt/python3.6/lib/python3.6/site-packages/google/resumable_media/_helpers.py",
 line 104, in require_status_cod
       *status_code
   google.resumable_media.common.InvalidResponse: ('Request failed with status 
code', 404, 'Expected one of', <HTTPStatus.OK: 200>, 
<HTTPStatus.PARTIAL_CONTENT: 206>
   
   During handling of the above exception, another exception occurred
   
   Traceback (most recent call last)
     File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in 
_run_raw_tas
       result = task_copy.execute(context=context
     File 
"/usr/local/lib/airflow/airflow/providers/google/cloud/operators/bigquery.py", 
line 1178, in execut
       schema_fields = json.loads(gcs_hook.download(self.bucket, 
self.schema_object)
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/gcs.py", 
line 301, in downloa
       return blob.download_as_string(
     File 
"/opt/python3.6/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 
1391, in download_as_strin
       timeout=timeout
     File 
"/opt/python3.6/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 
1302, in download_as_byte
       checksum=checksum
     File 
"/opt/python3.6/lib/python3.6/site-packages/google/cloud/storage/client.py", 
line 731, in download_blob_to_fil
       _raise_from_invalid_response(exc
     File 
"/opt/python3.6/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 
3936, in _raise_from_invalid_respons
       raise exceptions.from_http_status(response.status_code, message, 
response=response
   google.api_core.exceptions.NotFound: 404 GET 
https://storage.googleapis.com/download/storage/v1/b/foo/bar/schema.json/o/mybucket?alt=media:
 Not Found: ('Request failed with status code', 404, 'Expected one of', 
<HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>
   ```
   PS: the bucket (_mybucket_) and object path (_foo/bar/schema.json_) were 
masked for security reasons.
   
   I believe the error appears on the 
[following](https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/bigquery.py#L1183)
 line, although the bug itself is probably located on the 
[gcs_hook.download()](https://github.com/apache/airflow/blob/0264fea8c2024d7d3d64aa0ffa28a0cfa48839cd/airflow/providers/google/cloud/hooks/gcs.py#L266)
 method:
   
   `schema_fields = json.loads(gcs_hook.download(self.bucket, 
self.schema_object))`
   
   **How to reproduce it**:
   
   Create a DAG using both operators and the same parameters, as the example 
bellow. The task using the contrib version of the operator should work, while 
the task using the providers version should fail.
   
   ```
   from airflow.contrib.operators.bigquery_operator import 
BigQueryCreateExternalTableOperator as BQExtTabOptContrib
   from airflow.providers.google.cloud.operators.bigquery import 
BigQueryCreateExternalTableOperator as BQExtTabOptProviders
   
   #TODO: default args and DAG definition
   
   create_landing_external_table_contrib = BQExtTabOptContrib(
       task_id='create_landing_external_table_contrib',
       bucket='mybucket',
       source_objects=['foo/bar/*.csv'],
       destination_project_dataset_table='project.dataset.table',
       schema_object='foo/bar/schema_file.json',
   )
   
   create_landing_external_table_providers = BQExtTabOptProviders(
       task_id='create_landing_external_table_providers',
       bucket='mybucket',
       source_objects=['foo/bar/*.csv'],
       destination_project_dataset_table='project.dataset.table',
       schema_object='foo/bar/schema_file.json',
   )
   ```
   
   **Anything else we need to know**:
   
   The 
[*gcs_hook.download()*](https://github.com/apache/airflow/blob/0264fea8c2024d7d3d64aa0ffa28a0cfa48839cd/airflow/providers/google/cloud/hooks/gcs.py#L313)
 method is using the deprecated method _download_as_string()_ from the Cloud 
Storage API (https://googleapis.dev/python/storage/latest/blobs.html). It 
should be changed to _download_as_bytes()_.
   
   Also, comparing the [providers 
version](https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/bigquery.py#L1183)
 of the operator to the [contrib 
version](https://github.com/apache/airflow/blob/5786dcdc392f7a2649f398353a0beebef01c428e/airflow/contrib/operators/bigquery_operator.py#L621),
 I observed there is also a missing decode operation: `.decode("utf-8")`
   
   **Are you willing to submit a PR?**
   
   Yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to