[ 
https://issues.apache.org/jira/browse/BEAM-5457?focusedWorklogId=152445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152445
 ]

ASF GitHub Bot logged work on BEAM-5457:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Oct/18 22:26
            Start Date: 08/Oct/18 22:26
    Worklog Time Spent: 10m 
      Work Description: charlesccychen closed pull request #6463: [BEAM-5457] 
Auto-detect BigQuerySource(query=...) source location
URL: https://github.com/apache/beam/pull/6463
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 58db0957113..3f2fb8bb693 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -646,7 +646,7 @@ def __init__(self, source, test_bigquery_client=None, 
use_legacy_sql=True,
     self.use_legacy_sql = use_legacy_sql
     self.flatten_results = flatten_results
 
-    if self.source.query is None:
+    if self.source.table_reference is not None:
       # If table schema did not define a project we default to executing
       # project.
       project_id = self.source.table_reference.projectId
@@ -656,30 +656,44 @@ def __init__(self, source, test_bigquery_client=None, 
use_legacy_sql=True,
           project_id,
           self.source.table_reference.datasetId,
           self.source.table_reference.tableId)
-    else:
+    elif self.source.query is not None:
       self.query = self.source.query
+    else:
+      # Enforce the "modes" enforced by BigQuerySource.__init__.
+      # If this exception has been raised, the BigQuerySource "modes" have
+      # changed and this method will need to be updated as well.
+      raise ValueError("BigQuerySource must have either a table or query")
 
-  def _get_source_table_location(self):
-    tr = self.source.table_reference
-    if tr is None:
-      # TODO: implement location retrieval for query sources
-      return
+  def _get_source_location(self):
+    """
+    Get the source location (e.g. ``"EU"`` or ``"US"``) from either
 
-    if tr.projectId is None:
-      source_project_id = self.executing_project
-    else:
-      source_project_id = tr.projectId
+    - :data:`source.table_reference`
+      or
+    - The first referenced table in :data:`source.query`
 
-    source_dataset_id = tr.datasetId
-    source_table_id = tr.tableId
-    source_location = self.client.get_table_location(
-        source_project_id, source_dataset_id, source_table_id)
-    return source_location
+    See Also:
+      - :meth:`BigQueryWrapper.get_query_location`
+      - :meth:`BigQueryWrapper.get_table_location`
+
+    Returns:
+      Optional[str]: The source location, if any.
+    """
+    if self.source.table_reference is not None:
+      tr = self.source.table_reference
+      return self.client.get_table_location(
+          tr.projectId if tr.projectId is not None else self.executing_project,
+          tr.datasetId, tr.tableId)
+    else:  # It's a query source
+      return self.client.get_query_location(
+          self.executing_project,
+          self.source.query,
+          self.source.use_legacy_sql)
 
   def __enter__(self):
     self.client = BigQueryWrapper(client=self.test_bigquery_client)
     self.client.create_temporary_dataset(
-        self.executing_project, location=self._get_source_table_location())
+        self.executing_project, location=self._get_source_location())
     return self
 
   def __exit__(self, exception_type, exception_value, traceback):
@@ -799,6 +813,53 @@ def _get_temp_table(self, project_id):
         dataset=BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix,
         project=project_id)
 
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def get_query_location(self, project_id, query, use_legacy_sql):
+    """
+    Get the location of tables referenced in a query.
+
+    This method returns the location of the first referenced table in the query
+    and depends on the BigQuery service to provide error handling for
+    queries that reference tables in multiple locations.
+    """
+    reference = bigquery.JobReference(jobId=uuid.uuid4().hex,
+                                      projectId=project_id)
+    request = bigquery.BigqueryJobsInsertRequest(
+        projectId=project_id,
+        job=bigquery.Job(
+            configuration=bigquery.JobConfiguration(
+                dryRun=True,
+                query=bigquery.JobConfigurationQuery(
+                    query=query,
+                    useLegacySql=use_legacy_sql,
+                )),
+            jobReference=reference))
+
+    response = self.client.jobs.Insert(request)
+
+    if response.statistics is None:
+      # This behavior is only expected in tests
+      logging.warning(
+          "Unable to get location, missing response.statistics. Query: %s",
+          query)
+      return None
+
+    referenced_tables = response.statistics.query.referencedTables
+    if referenced_tables:  # Guards against both non-empty and non-None
+      table = referenced_tables[0]
+      location = self.get_table_location(
+          table.projectId,
+          table.datasetId,
+          table.tableId)
+      logging.info("Using location %r from table %r referenced by query %s",
+                   location, table, query)
+      return location
+
+    logging.debug("Query %s does not reference any tables.", query)
+    return None
+
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
@@ -949,8 +1010,7 @@ def get_table_location(self, project_id, dataset_id, 
table_id):
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def create_temporary_dataset(self, project_id, location=None):
-    # TODO: make location required, once "query" locations can be determined
+  def create_temporary_dataset(self, project_id, location):
     dataset_id = BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix
     # Check if dataset exists to make sure that the temporary id is unique
     try:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index b155244d64b..ffd6a6f0474 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -826,7 +826,7 @@ def test_temporary_dataset_is_unique(self, 
patched_time_sleep):
             projectId='project_id', datasetId='dataset_id'))
     wrapper = beam.io.gcp.bigquery.BigQueryWrapper(client)
     with self.assertRaises(RuntimeError):
-      wrapper.create_temporary_dataset('project_id')
+      wrapper.create_temporary_dataset('project_id', 'location')
     self.assertTrue(client.datasets.Get.called)
 
   def test_get_or_create_dataset_created(self):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 152445)
    Time Spent: 4h 40m  (was: 4.5h)

> BigQuerySource(query=...) in DirectRunner creates temp dataset in the wrong 
> location
> ------------------------------------------------------------------------------------
>
>                 Key: BEAM-5457
>                 URL: https://issues.apache.org/jira/browse/BEAM-5457
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.6.0
>            Reporter: Joar Wandborg
>            Assignee: Chamikara Jayalath
>            Priority: Major
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> I'm in the EU, if I have a
>  
> {code:java}
> BigQuerySource(
>     query="SELECT x, y FROM `my-other-project.mydataset.my_european_table`",
>     project="myproject",
>     use_standard_sql=True
> ){code}
> And then run the Pipeline through the DirectRunner I get the following 
> warning and error:
> {noformat}
> 2018-09-21 11:39:52,620 WARNING root create_temporary_dataset
> Dataset myproject:temp_dataset_0bbb28f014a24225b668a67341f4f71e does not 
> exist so we will create it as temporary with location=None {noformat}
> {noformat}
> HttpBadRequestError: HttpError accessing 
> <https://www.googleapis.com/bigquery/v2/projects/myproject/queries/xyz123?alt=json&maxResults=10000>:
>  response: <{'status': '400', 'content-length': '354', 'x-xss-protection': 
> '1; mode=block', 'x-content-type-options': 'nosniff', 'transfer-encoding': 
> 'chunked', 'vary': 'Origin, X-Origin, Referer', 'server': 'ESF', 
> '-content-encoding': 'gzip', 'cache-control': 'private', 'date': 'Fri, 21 Sep 
> 2018 09:39:55 GMT', 'x-frame-options': 'SAMEORIGIN', 'alt-svc': 'quic=":443"; 
> ma=2592000; v="44,43,39,35"', 'content-type': 'application/json; 
> charset=UTF-8'}>, content <{
>   "error": {
>     "code": 400,
>     "message": "Cannot read and write in different locations: source: EU, 
> destination: US",
>     "errors": [
>       {
>         "message": "Cannot read and write in different locations: source: EU, 
> destination: US",
>         "domain": "global",
>         "reason": "invalid"
>       }
>     ],
>     "status": "INVALID_ARGUMENT"
>   }
> {noformat}
> There's a TODO in the code that looks very related: 
> [https://github.com/apache/beam/blob/d691a86b8fd082efd0fd71c3cb58b7d61442717d/sdks/python/apache_beam/io/gcp/bigquery.py#L665|https://github.com/apache/beam/blob/d691a86b8fd082efd0fd71c3cb58b7d61442717d/sdks/python/apache_beam/io/gcp/bigquery.py#L665,]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to