Repository: beam Updated Branches: refs/heads/master d919394c7 -> a9fdc3bc4
We shouldn't write to re-created tables for 2 mins Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/483abc09 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/483abc09 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/483abc09 Branch: refs/heads/master Commit: 483abc0941f0fb42c506565f6912153296fd94b5 Parents: d919394 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Mon Jul 24 15:54:02 2017 -0700 Committer: chamik...@google.com <chamik...@google.com> Committed: Tue Jul 25 22:58:23 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcp/bigquery.py | 19 +++++++++++++++---- sdks/python/apache_beam/io/gcp/bigquery_test.py | 3 ++- 2 files changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/483abc09/sdks/python/apache_beam/io/gcp/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 23fd310..db6715a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1002,12 +1002,23 @@ class BigQueryWrapper(object): if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE: return found_table else: + created_table = self._create_table(project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + schema=schema or found_table.schema) # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete # the table before this point. - return self._create_table(project_id=project_id, - dataset_id=dataset_id, - table_id=table_id, - schema=schema or found_table.schema) + if write_disposition == BigQueryDisposition.WRITE_TRUNCATE: + # BigQuery can route data to the old table for 2 mins max so wait + # that much time before creating the table and writing it + logging.warning('Sleeping for 150 seconds before the write as ' + + 'BigQuery inserts can be routed to deleted table ' + + 'for 2 mins after the delete and create.') + # TODO(BEAM-2673): Remove this sleep by migrating to load api + time.sleep(150) + return created_table + else: + return created_table def run_query(self, project_id, query, use_legacy_sql, flatten_results, dry_run=False): http://git-wip-us.apache.org/repos/asf/beam/blob/483abc09/sdks/python/apache_beam/io/gcp/bigquery_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 14247ba..bfd06ac 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -650,7 +650,8 @@ class TestBigQueryWriter(unittest.TestCase): self.assertFalse(client.tables.Delete.called) self.assertFalse(client.tables.Insert.called) - def test_table_with_write_disposition_truncate(self): + @mock.patch('time.sleep', return_value=None) + def test_table_with_write_disposition_truncate(self, _patched_sleep): client = mock.Mock() table = bigquery.Table( tableReference=bigquery.TableReference(