This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ea6f41  [BEAM-7814] Wait for BQ query results in tests
     new 5c6f428  Merge pull request #9150 from udim/bq-test-client-wait
8ea6f41 is described below

commit 8ea6f415652fddc6cbb9bbce5b15b255fc47add3
Author: Udi Meiri <eh...@google.com>
AuthorDate: Wed Jul 24 18:10:03 2019 -0700

    [BEAM-7814] Wait for BQ query results in tests
---
 .../apache_beam/io/gcp/tests/bigquery_matcher.py      | 19 +++++++------------
 1 file changed, 7 insertions(+), 12 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py 
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 79e3389..91c2bce 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -135,17 +135,12 @@ class BigqueryFullResultMatcher(BaseMatcher):
     self.project = project
     self.query = query
     self.expected_data = data
+    self.actual_data = None
 
   def _matches(self, _):
-    logging.info('Start verify Bigquery data.')
-    # Run query
-    bigquery_client = bigquery.Client(project=self.project)
-    response = self._get_query_result(bigquery_client)
-    logging.info('Read from given query (%s), total rows %d',
-                 self.query, len(response))
-    logging.info('Response from BigQuery is %r', response)
-
-    self.actual_data = response
+    if self.actual_data is None:
+      bigquery_client = bigquery.Client(project=self.project)
+      self.actual_data = self._get_query_result(bigquery_client)
 
     # Verify result
     try:
@@ -163,9 +158,9 @@ class BigqueryFullResultMatcher(BaseMatcher):
   def _query_with_retry(self, bigquery_client):
     """Run Bigquery query with retry if got error http response"""
     logging.info('Attempting to perform query %s to BQ', self.query)
-    query_job = bigquery_client.query(self.query)
-    logging.info('Result of query is: %r', query_job)
-    return [row.values() for row in query_job]
+    rows = bigquery_client.query(self.query).result(timeout=60)
+    logging.info('Result of query is: %r', rows)
+    return [row.values() for row in rows]
 
   def describe_to(self, description):
     description \

Reply via email to