[BEAM-1218] Move GCP specific IO into separate module
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/908c8532 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/908c8532 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/908c8532 Branch: refs/heads/master Commit: 908c85327ce17a0ed64401d1e86cb86396284fdb Parents: 2872f86 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Sat Feb 18 22:52:06 2017 -0800 Committer: Ahmet Altay <al...@google.com> Committed: Sun Feb 19 16:21:25 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets.py | 4 +- sdks/python/apache_beam/io/__init__.py | 4 +- sdks/python/apache_beam/io/bigquery.py | 1082 ------------------ sdks/python/apache_beam/io/bigquery_test.py | 812 ------------- .../python/apache_beam/io/datastore/__init__.py | 16 - .../apache_beam/io/datastore/v1/__init__.py | 16 - .../apache_beam/io/datastore/v1/datastoreio.py | 391 ------- .../io/datastore/v1/datastoreio_test.py | 237 ---- .../io/datastore/v1/fake_datastore.py | 92 -- .../apache_beam/io/datastore/v1/helper.py | 267 ----- .../apache_beam/io/datastore/v1/helper_test.py | 256 ----- .../io/datastore/v1/query_splitter.py | 269 ----- .../io/datastore/v1/query_splitter_test.py | 201 ---- sdks/python/apache_beam/io/fileio.py | 2 +- sdks/python/apache_beam/io/gcsio.py | 871 -------------- sdks/python/apache_beam/io/gcsio_test.py | 786 ------------- .../io/google_cloud_platform/bigquery.py | 1082 ++++++++++++++++++ .../io/google_cloud_platform/bigquery_test.py | 813 +++++++++++++ .../google_cloud_platform/datastore/__init__.py | 16 + .../datastore/v1/__init__.py | 16 + .../datastore/v1/datastoreio.py | 391 +++++++ .../datastore/v1/datastoreio_test.py | 237 ++++ .../datastore/v1/fake_datastore.py | 92 ++ .../datastore/v1/helper.py | 267 +++++ .../datastore/v1/helper_test.py | 256 +++++ .../datastore/v1/query_splitter.py | 269 +++++ .../datastore/v1/query_splitter_test.py | 201 ++++ .../io/google_cloud_platform/gcsio.py | 871 ++++++++++++++ .../io/google_cloud_platform/gcsio_test.py | 786 +++++++++++++ .../io/google_cloud_platform/pubsub.py | 91 ++ .../io/google_cloud_platform/pubsub_test.py | 63 + sdks/python/apache_beam/io/pubsub.py | 91 -- sdks/python/apache_beam/io/pubsub_test.py | 62 - 33 files changed, 5456 insertions(+), 5454 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 6f081df..e7f28b0 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -867,8 +867,8 @@ def model_datastoreio(): import googledatastore import apache_beam as beam from apache_beam.utils.pipeline_options import PipelineOptions - from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore - from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore + from apache_beam.io.google_cloud_platform.datastore.v1.datastoreio import ReadFromDatastore + from apache_beam.io.google_cloud_platform.datastore.v1.datastoreio import WriteToDatastore project = 'my_project' kind = 'my_kind' http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/io/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py index 13ce36f..972ed53 100644 --- a/sdks/python/apache_beam/io/__init__.py +++ b/sdks/python/apache_beam/io/__init__.py @@ -19,13 +19,13 @@ # pylint: disable=wildcard-import from apache_beam.io.avroio import * -from apache_beam.io.bigquery import * from apache_beam.io.fileio import * from apache_beam.io.iobase import Read from apache_beam.io.iobase import Sink from apache_beam.io.iobase import Write from apache_beam.io.iobase import Writer -from apache_beam.io.pubsub import * from apache_beam.io.textio import * from apache_beam.io.tfrecordio import * from apache_beam.io.range_trackers import * +from apache_beam.io.google_cloud_platform.bigquery import * +from apache_beam.io.google_cloud_platform.pubsub import * http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py deleted file mode 100644 index 3beecea..0000000 --- a/sdks/python/apache_beam/io/bigquery.py +++ /dev/null @@ -1,1082 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""BigQuery sources and sinks. - -This module implements reading from and writing to BigQuery tables. It relies -on several classes exposed by the BigQuery API: TableSchema, TableFieldSchema, -TableRow, and TableCell. The default mode is to return table rows read from a -BigQuery source as dictionaries. Similarly a Write transform to a BigQuerySink -accepts PCollections of dictionaries. This is done for more convenient -programming. If desired, the native TableRow objects can be used throughout to -represent rows (use an instance of TableRowJsonCoder as a coder argument when -creating the sources or sinks respectively). - -Also, for programming convenience, instances of TableReference and TableSchema -have a string representation that can be used for the corresponding arguments: - - - TableReference can be a PROJECT:DATASET.TABLE or DATASET.TABLE string. - - TableSchema can be a NAME:TYPE{,NAME:TYPE}* string - (e.g. 'month:STRING,event_count:INTEGER'). - -The syntax supported is described here: -https://cloud.google.com/bigquery/bq-command-line-tool-quickstart - -BigQuery sources can be used as main inputs or side inputs. A main input -(common case) is expected to be massive and will be split into manageable chunks -and processed in parallel. Side inputs are expected to be small and will be read -completely every time a ParDo DoFn gets executed. In the example below the -lambda function implementing the DoFn for the Map transform will get on each -call *one* row of the main table and *all* rows of the side table. The runner -may use some caching techniques to share the side inputs between calls in order -to avoid excessive reading: - - main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource() - side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource() - results = ( - main_table - | 'process data' >> beam.Map( - lambda element, side_input: ..., AsList(side_table))) - -There is no difference in how main and side inputs are read. What makes the -side_table a 'side input' is the AsList wrapper used when passing the table -as a parameter to the Map transform. AsList signals to the execution framework -that its input should be made available whole. - -The main and side inputs are implemented differently. Reading a BigQuery table -as main input entails exporting the table to a set of GCS files (currently in -JSON format) and then processing those files. Reading the same table as a side -input entails querying the table for all its rows. The coder argument on -BigQuerySource controls the reading of the lines in the export files (i.e., -transform a JSON object into a PCollection element). The coder is not involved -when the same table is read as a side input since there is no intermediate -format involved. We get the table rows directly from the BigQuery service with -a query. - -Users may provide a query to read from rather than reading all of a BigQuery -table. If specified, the result obtained by executing the specified query will -be used as the data of the input transform. - - query_results = pipeline | beam.io.Read(beam.io.BigQuerySource( - query='SELECT year, mean_temp FROM samples.weather_stations')) - -When creating a BigQuery input transform, users should provide either a query -or a table. Pipeline construction will fail with a validation error if neither -or both are specified. - -*** Short introduction to BigQuery concepts *** -Tables have rows (TableRow) and each row has cells (TableCell). -A table has a schema (TableSchema), which in turn describes the schema of each -cell (TableFieldSchema). The terms field and cell are used interchangeably. - -TableSchema: Describes the schema (types and order) for values in each row. - Has one attribute, 'field', which is list of TableFieldSchema objects. - -TableFieldSchema: Describes the schema (type, name) for one field. - Has several attributes, including 'name' and 'type'. Common values for - the type attribute are: 'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN'. All possible - values are described at: - https://cloud.google.com/bigquery/preparing-data-for-bigquery#datatypes - -TableRow: Holds all values in a table row. Has one attribute, 'f', which is a - list of TableCell instances. - -TableCell: Holds the value for one cell (or field). Has one attribute, - 'v', which is a JsonValue instance. This class is defined in - apitools.base.py.extra_types.py module. -""" - -from __future__ import absolute_import - -import collections -import datetime -import json -import logging -import re -import time -import uuid - -from apitools.base.py.exceptions import HttpError - -from apache_beam import coders -from apache_beam.internal import auth -from apache_beam.internal.json_value import from_json_value -from apache_beam.internal.json_value import to_json_value -from apache_beam.runners.google_cloud_dataflow.native_io import iobase as dataflow_io -from apache_beam.transforms.display import DisplayDataItem -from apache_beam.utils import retry -from apache_beam.utils.pipeline_options import GoogleCloudOptions - -# Protect against environments where bigquery library is not available. -# pylint: disable=wrong-import-order, wrong-import-position -try: - from apache_beam.io.google_cloud_platform.internal.clients import bigquery -except ImportError: - pass -# pylint: enable=wrong-import-order, wrong-import-position - - -__all__ = [ - 'TableRowJsonCoder', - 'BigQueryDisposition', - 'BigQuerySource', - 'BigQuerySink', - ] - -JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.' -MAX_RETRIES = 3 - - -class RowAsDictJsonCoder(coders.Coder): - """A coder for a table row (represented as a dict) to/from a JSON string. - - This is the default coder for sources and sinks if the coder argument is not - specified. - """ - - def encode(self, table_row): - # The normal error when dumping NAN/INF values is: - # ValueError: Out of range float values are not JSON compliant - # This code will catch this error to emit an error that explains - # to the programmer that they have used NAN/INF values. - try: - return json.dumps(table_row, allow_nan=False) - except ValueError as e: - raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR)) - - def decode(self, encoded_table_row): - return json.loads(encoded_table_row) - - -class TableRowJsonCoder(coders.Coder): - """A coder for a TableRow instance to/from a JSON string. - - Note that the encoding operation (used when writing to sinks) requires the - table schema in order to obtain the ordered list of field names. Reading from - sources on the other hand does not need the table schema. - """ - - def __init__(self, table_schema=None): - # The table schema is needed for encoding TableRows as JSON (writing to - # sinks) because the ordered list of field names is used in the JSON - # representation. - self.table_schema = table_schema - # Precompute field names since we need them for row encoding. - if self.table_schema: - self.field_names = tuple(fs.name for fs in self.table_schema.fields) - - def encode(self, table_row): - if self.table_schema is None: - raise AttributeError( - 'The TableRowJsonCoder requires a table schema for ' - 'encoding operations. Please specify a table_schema argument.') - try: - return json.dumps( - collections.OrderedDict( - zip(self.field_names, - [from_json_value(f.v) for f in table_row.f])), - allow_nan=False) - except ValueError as e: - raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR)) - - def decode(self, encoded_table_row): - od = json.loads( - encoded_table_row, object_pairs_hook=collections.OrderedDict) - return bigquery.TableRow( - f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()]) - - -def parse_table_schema_from_json(schema_string): - """Parse the Table Schema provided as string. - - Args: - schema_string: String serialized table schema, should be a valid JSON. - - Returns: - A TableSchema of the BigQuery export from either the Query or the Table. - """ - json_schema = json.loads(schema_string) - - def _parse_schema_field(field): - """Parse a single schema field from dictionary. - - Args: - field: Dictionary object containing serialized schema. - - Returns: - A TableFieldSchema for a single column in BigQuery. - """ - schema = bigquery.TableFieldSchema() - schema.name = field['name'] - schema.type = field['type'] - if 'mode' in field: - schema.mode = field['mode'] - else: - schema.mode = 'NULLABLE' - if 'description' in field: - schema.description = field['description'] - if 'fields' in field: - schema.fields = [_parse_schema_field(x) for x in field['fields']] - return schema - - fields = [_parse_schema_field(f) for f in json_schema['fields']] - return bigquery.TableSchema(fields=fields) - - -class BigQueryDisposition(object): - """Class holding standard strings used for create and write dispositions.""" - - CREATE_NEVER = 'CREATE_NEVER' - CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' - WRITE_TRUNCATE = 'WRITE_TRUNCATE' - WRITE_APPEND = 'WRITE_APPEND' - WRITE_EMPTY = 'WRITE_EMPTY' - - @staticmethod - def validate_create(disposition): - values = (BigQueryDisposition.CREATE_NEVER, - BigQueryDisposition.CREATE_IF_NEEDED) - if disposition not in values: - raise ValueError( - 'Invalid create disposition %s. Expecting %s' % (disposition, values)) - return disposition - - @staticmethod - def validate_write(disposition): - values = (BigQueryDisposition.WRITE_TRUNCATE, - BigQueryDisposition.WRITE_APPEND, - BigQueryDisposition.WRITE_EMPTY) - if disposition not in values: - raise ValueError( - 'Invalid write disposition %s. Expecting %s' % (disposition, values)) - return disposition - - -def _parse_table_reference(table, dataset=None, project=None): - """Parses a table reference into a (project, dataset, table) tuple. - - Args: - table: The ID of the table. The ID must contain only letters - (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is None - then the table argument must contain the entire table reference: - 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. This argument can be a - bigquery.TableReference instance in which case dataset and project are - ignored and the reference is returned as a result. Additionally, for date - partitioned tables, appending '$YYYYmmdd' to the table name is supported, - e.g. 'DATASET.TABLE$YYYYmmdd'. - dataset: The ID of the dataset containing this table or null if the table - reference is specified entirely by the table argument. - project: The ID of the project containing this table or null if the table - reference is specified entirely by the table (and possibly dataset) - argument. - - Returns: - A bigquery.TableReference object. The object has the following attributes: - projectId, datasetId, and tableId. - - Raises: - ValueError: if the table reference as a string does not match the expected - format. - """ - - if isinstance(table, bigquery.TableReference): - return table - - table_reference = bigquery.TableReference() - # If dataset argument is not specified, the expectation is that the - # table argument will contain a full table reference instead of just a - # table name. - if dataset is None: - match = re.match( - r'^((?P<project>.+):)?(?P<dataset>\w+)\.(?P<table>[\w\$]+)$', table) - if not match: - raise ValueError( - 'Expected a table reference (PROJECT:DATASET.TABLE or ' - 'DATASET.TABLE) instead of %s.' % table) - table_reference.projectId = match.group('project') - table_reference.datasetId = match.group('dataset') - table_reference.tableId = match.group('table') - else: - table_reference.projectId = project - table_reference.datasetId = dataset - table_reference.tableId = table - return table_reference - - -# ----------------------------------------------------------------------------- -# BigQuerySource, BigQuerySink. - - -class BigQuerySource(dataflow_io.NativeSource): - """A source based on a BigQuery table.""" - - def __init__(self, table=None, dataset=None, project=None, query=None, - validate=False, coder=None, use_standard_sql=False, - flatten_results=True): - """Initialize a BigQuerySource. - - Args: - table: The ID of a BigQuery table. If specified all data of the table - will be used as input of the current source. The ID must contain only - letters (a-z, A-Z), numbers (0-9), or underscores (_). If dataset - and query arguments are None then the table argument must contain the - entire table reference specified as: 'DATASET.TABLE' or - 'PROJECT:DATASET.TABLE'. - dataset: The ID of the dataset containing this table or null if the table - reference is specified entirely by the table argument or a query is - specified. - project: The ID of the project containing this table or null if the table - reference is specified entirely by the table argument or a query is - specified. - query: A query to be used instead of arguments table, dataset, and - project. - validate: If true, various checks will be done when source gets - initialized (e.g., is table present?). This should be True for most - scenarios in order to catch errors as early as possible (pipeline - construction instead of pipeline execution). It should be False if the - table is created during pipeline execution by a previous step. - coder: The coder for the table rows if serialized to disk. If None, then - the default coder is RowAsDictJsonCoder, which will interpret every line - in a file as a JSON serialized dictionary. This argument needs a value - only in special cases when returning table rows as dictionaries is not - desirable. - use_standard_sql: Specifies whether to use BigQuery's standard - SQL dialect for this query. The default value is False. If set to True, - the query will use BigQuery's updated SQL dialect with improved - standards compliance. This parameter is ignored for table inputs. - flatten_results: Flattens all nested and repeated fields in the - query results. The default value is true. - - Raises: - ValueError: if any of the following is true - (1) the table reference as a string does not match the expected format - (2) neither a table nor a query is specified - (3) both a table and a query is specified. - """ - - if table is not None and query is not None: - raise ValueError('Both a BigQuery table and a query were specified.' - ' Please specify only one of these.') - elif table is None and query is None: - raise ValueError('A BigQuery table or a query must be specified') - elif table is not None: - self.table_reference = _parse_table_reference(table, dataset, project) - self.query = None - self.use_legacy_sql = True - else: - self.query = query - # TODO(BEAM-1082): Change the internal flag to be standard_sql - self.use_legacy_sql = not use_standard_sql - self.table_reference = None - - self.validate = validate - self.flatten_results = flatten_results - self.coder = coder or RowAsDictJsonCoder() - - def display_data(self): - if self.query is not None: - res = {'query': DisplayDataItem(self.query, label='Query')} - else: - if self.table_reference.projectId is not None: - tableSpec = '{}:{}.{}'.format(self.table_reference.projectId, - self.table_reference.datasetId, - self.table_reference.tableId) - else: - tableSpec = '{}.{}'.format(self.table_reference.datasetId, - self.table_reference.tableId) - res = {'table': DisplayDataItem(tableSpec, label='Table')} - - res['validation'] = DisplayDataItem(self.validate, - label='Validation Enabled') - return res - - @property - def format(self): - """Source format name required for remote execution.""" - return 'bigquery' - - def reader(self, test_bigquery_client=None): - return BigQueryReader( - source=self, - test_bigquery_client=test_bigquery_client, - use_legacy_sql=self.use_legacy_sql, - flatten_results=self.flatten_results) - - -class BigQuerySink(dataflow_io.NativeSink): - """A sink based on a BigQuery table.""" - - def __init__(self, table, dataset=None, project=None, schema=None, - create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=BigQueryDisposition.WRITE_EMPTY, - validate=False, coder=None): - """Initialize a BigQuerySink. - - Args: - table: The ID of the table. The ID must contain only letters - (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is - None then the table argument must contain the entire table reference - specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. - dataset: The ID of the dataset containing this table or null if the table - reference is specified entirely by the table argument. - project: The ID of the project containing this table or null if the table - reference is specified entirely by the table argument. - schema: The schema to be used if the BigQuery table to write has to be - created. This can be either specified as a 'bigquery.TableSchema' object - or a single string of the form 'field1:type1,field2:type2,field3:type3' - that defines a comma separated list of fields. Here 'type' should - specify the BigQuery type of the field. Single string based schemas do - not support nested fields, repeated fields, or specifying a BigQuery - mode for fields (mode will always be set to 'NULLABLE'). - create_disposition: A string describing what happens if the table does not - exist. Possible values are: - - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist. - - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist. - write_disposition: A string describing what happens if the table has - already some data. Possible values are: - - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows. - - BigQueryDisposition.WRITE_APPEND: add to existing rows. - - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty. - validate: If true, various checks will be done when sink gets - initialized (e.g., is table present given the disposition arguments?). - This should be True for most scenarios in order to catch errors as early - as possible (pipeline construction instead of pipeline execution). It - should be False if the table is created during pipeline execution by a - previous step. - coder: The coder for the table rows if serialized to disk. If None, then - the default coder is RowAsDictJsonCoder, which will interpret every - element written to the sink as a dictionary that will be JSON serialized - as a line in a file. This argument needs a value only in special cases - when writing table rows as dictionaries is not desirable. - - Raises: - TypeError: if the schema argument is not a string or a TableSchema object. - ValueError: if the table reference as a string does not match the expected - format. - """ - self.table_reference = _parse_table_reference(table, dataset, project) - # Transform the table schema into a bigquery.TableSchema instance. - if isinstance(schema, basestring): - # TODO(silviuc): Should add a regex-based validation of the format. - table_schema = bigquery.TableSchema() - schema_list = [s.strip(' ') for s in schema.split(',')] - for field_and_type in schema_list: - field_name, field_type = field_and_type.split(':') - field_schema = bigquery.TableFieldSchema() - field_schema.name = field_name - field_schema.type = field_type - field_schema.mode = 'NULLABLE' - table_schema.fields.append(field_schema) - self.table_schema = table_schema - elif schema is None: - # TODO(silviuc): Should check that table exists if no schema specified. - self.table_schema = schema - elif isinstance(schema, bigquery.TableSchema): - self.table_schema = schema - else: - raise TypeError('Unexpected schema argument: %s.' % schema) - - self.create_disposition = BigQueryDisposition.validate_create( - create_disposition) - self.write_disposition = BigQueryDisposition.validate_write( - write_disposition) - self.validate = validate - self.coder = coder or RowAsDictJsonCoder() - - def display_data(self): - res = {} - if self.table_reference is not None: - tableSpec = '{}.{}'.format(self.table_reference.datasetId, - self.table_reference.tableId) - if self.table_reference.projectId is not None: - tableSpec = '{}:{}'.format(self.table_reference.projectId, - tableSpec) - res['table'] = DisplayDataItem(tableSpec, label='Table') - - res['validation'] = DisplayDataItem(self.validate, - label="Validation Enabled") - return res - - def schema_as_json(self): - """Returns the TableSchema associated with the sink as a JSON string.""" - - def schema_list_as_object(schema_list): - """Returns a list of TableFieldSchema objects as a list of dicts.""" - fields = [] - for f in schema_list: - fs = {'name': f.name, 'type': f.type} - if f.description is not None: - fs['description'] = f.description - if f.mode is not None: - fs['mode'] = f.mode - if f.type.lower() == 'record': - fs['fields'] = schema_list_as_object(f.fields) - fields.append(fs) - return fields - return json.dumps( - {'fields': schema_list_as_object(self.table_schema.fields)}) - - @property - def format(self): - """Sink format name required for remote execution.""" - return 'bigquery' - - def writer(self, test_bigquery_client=None, buffer_size=None): - return BigQueryWriter( - sink=self, test_bigquery_client=test_bigquery_client, - buffer_size=buffer_size) - - -# ----------------------------------------------------------------------------- -# BigQueryReader, BigQueryWriter. - - -class BigQueryReader(dataflow_io.NativeSourceReader): - """A reader for a BigQuery source.""" - - def __init__(self, source, test_bigquery_client=None, use_legacy_sql=True, - flatten_results=True): - self.source = source - self.test_bigquery_client = test_bigquery_client - if auth.is_running_in_gce: - self.executing_project = auth.executing_project - elif hasattr(source, 'pipeline_options'): - self.executing_project = ( - source.pipeline_options.view_as(GoogleCloudOptions).project) - else: - self.executing_project = None - - # TODO(silviuc): Try to automatically get it from gcloud config info. - if not self.executing_project and test_bigquery_client is None: - raise RuntimeError( - 'Missing executing project information. Please use the --project ' - 'command line option to specify it.') - self.row_as_dict = isinstance(self.source.coder, RowAsDictJsonCoder) - # Schema for the rows being read by the reader. It is initialized the - # first time something gets read from the table. It is not required - # for reading the field values in each row but could be useful for - # getting additional details. - self.schema = None - self.use_legacy_sql = use_legacy_sql - self.flatten_results = flatten_results - - if self.source.query is None: - # If table schema did not define a project we default to executing - # project. - project_id = self.source.table_reference.projectId - if not project_id: - project_id = self.executing_project - self.query = 'SELECT * FROM [%s:%s.%s];' % ( - project_id, - self.source.table_reference.datasetId, - self.source.table_reference.tableId) - else: - self.query = self.source.query - - def __enter__(self): - self.client = BigQueryWrapper(client=self.test_bigquery_client) - self.client.create_temporary_dataset(self.executing_project) - return self - - def __exit__(self, exception_type, exception_value, traceback): - self.client.clean_up_temporary_dataset(self.executing_project) - - def __iter__(self): - for rows, schema in self.client.run_query( - project_id=self.executing_project, query=self.query, - use_legacy_sql=self.use_legacy_sql, - flatten_results=self.flatten_results): - if self.schema is None: - self.schema = schema - for row in rows: - if self.row_as_dict: - yield self.client.convert_row_to_dict(row, schema) - else: - yield row - - -class BigQueryWriter(dataflow_io.NativeSinkWriter): - """The sink writer for a BigQuerySink.""" - - def __init__(self, sink, test_bigquery_client=None, buffer_size=None): - self.sink = sink - self.test_bigquery_client = test_bigquery_client - self.row_as_dict = isinstance(self.sink.coder, RowAsDictJsonCoder) - # Buffer used to batch written rows so we reduce communication with the - # BigQuery service. - self.rows_buffer = [] - self.rows_buffer_flush_threshold = buffer_size or 1000 - # Figure out the project, dataset, and table used for the sink. - self.project_id = self.sink.table_reference.projectId - - # If table schema did not define a project we default to executing project. - if self.project_id is None and hasattr(sink, 'pipeline_options'): - self.project_id = ( - sink.pipeline_options.view_as(GoogleCloudOptions).project) - - assert self.project_id is not None - - self.dataset_id = self.sink.table_reference.datasetId - self.table_id = self.sink.table_reference.tableId - - def _flush_rows_buffer(self): - if self.rows_buffer: - logging.info('Writing %d rows to %s:%s.%s table.', len(self.rows_buffer), - self.project_id, self.dataset_id, self.table_id) - passed, errors = self.client.insert_rows( - project_id=self.project_id, dataset_id=self.dataset_id, - table_id=self.table_id, rows=self.rows_buffer) - self.rows_buffer = [] - if not passed: - raise RuntimeError('Could not successfully insert rows to BigQuery' - ' table [%s:%s.%s]. Errors: %s'% - (self.project_id, self.dataset_id, - self.table_id, errors)) - - def __enter__(self): - self.client = BigQueryWrapper(client=self.test_bigquery_client) - self.client.get_or_create_table( - self.project_id, self.dataset_id, self.table_id, self.sink.table_schema, - self.sink.create_disposition, self.sink.write_disposition) - return self - - def __exit__(self, exception_type, exception_value, traceback): - self._flush_rows_buffer() - - def Write(self, row): - self.rows_buffer.append(row) - if len(self.rows_buffer) > self.rows_buffer_flush_threshold: - self._flush_rows_buffer() - - -# ----------------------------------------------------------------------------- -# BigQueryWrapper. - - -class BigQueryWrapper(object): - """BigQuery client wrapper with utilities for querying. - - The wrapper is used to organize all the BigQuery integration points and - offer a common place where retry logic for failures can be controlled. - In addition it offers various functions used both in sources and sinks - (e.g., find and create tables, query a table, etc.). - """ - - TEMP_TABLE = 'temp_table_' - TEMP_DATASET = 'temp_dataset_' - - def __init__(self, client=None): - self.client = client or bigquery.BigqueryV2( - credentials=auth.get_service_credentials()) - self._unique_row_id = 0 - # For testing scenarios where we pass in a client we do not want a - # randomized prefix for row IDs. - self._row_id_prefix = '' if client else uuid.uuid4() - self._temporary_table_suffix = uuid.uuid4().hex - - @property - def unique_row_id(self): - """Returns a unique row ID (str) used to avoid multiple insertions. - - If the row ID is provided, BigQuery will make a best effort to not insert - the same row multiple times for fail and retry scenarios in which the insert - request may be issued several times. This comes into play for sinks executed - in a local runner. - - Returns: - a unique row ID string - """ - self._unique_row_id += 1 - return '%s_%d' % (self._row_id_prefix, self._unique_row_id) - - def _get_temp_table(self, project_id): - return _parse_table_reference( - table=BigQueryWrapper.TEMP_TABLE + self._temporary_table_suffix, - dataset=BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix, - project=project_id) - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def _start_query_job(self, project_id, query, use_legacy_sql, flatten_results, - job_id, dry_run=False): - reference = bigquery.JobReference(jobId=job_id, projectId=project_id) - request = bigquery.BigqueryJobsInsertRequest( - projectId=project_id, - job=bigquery.Job( - configuration=bigquery.JobConfiguration( - dryRun=dry_run, - query=bigquery.JobConfigurationQuery( - query=query, - useLegacySql=use_legacy_sql, - allowLargeResults=True, - destinationTable=self._get_temp_table(project_id), - flattenResults=flatten_results)), - jobReference=reference)) - - response = self.client.jobs.Insert(request) - return response.jobReference.jobId - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def _get_query_results(self, project_id, job_id, - page_token=None, max_results=10000): - request = bigquery.BigqueryJobsGetQueryResultsRequest( - jobId=job_id, pageToken=page_token, projectId=project_id, - maxResults=max_results) - response = self.client.jobs.GetQueryResults(request) - return response - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_filter) - def _insert_all_rows(self, project_id, dataset_id, table_id, rows): - # The rows argument is a list of - # bigquery.TableDataInsertAllRequest.RowsValueListEntry instances as - # required by the InsertAll() method. - request = bigquery.BigqueryTabledataInsertAllRequest( - projectId=project_id, datasetId=dataset_id, tableId=table_id, - tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest( - # TODO(silviuc): Should have an option for skipInvalidRows? - # TODO(silviuc): Should have an option for ignoreUnknownValues? - rows=rows)) - response = self.client.tabledata.InsertAll(request) - # response.insertErrors is not [] if errors encountered. - return not response.insertErrors, response.insertErrors - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def _get_table(self, project_id, dataset_id, table_id): - request = bigquery.BigqueryTablesGetRequest( - projectId=project_id, datasetId=dataset_id, tableId=table_id) - response = self.client.tables.Get(request) - # The response is a bigquery.Table instance. - return response - - def _create_table(self, project_id, dataset_id, table_id, schema): - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId=project_id, datasetId=dataset_id, tableId=table_id), - schema=schema) - request = bigquery.BigqueryTablesInsertRequest( - projectId=project_id, datasetId=dataset_id, table=table) - response = self.client.tables.Insert(request) - # The response is a bigquery.Table instance. - return response - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def get_or_create_dataset(self, project_id, dataset_id): - # Check if dataset already exists otherwise create it - try: - dataset = self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest( - projectId=project_id, datasetId=dataset_id)) - return dataset - except HttpError as exn: - if exn.status_code == 404: - dataset = bigquery.Dataset( - datasetReference=bigquery.DatasetReference( - projectId=project_id, datasetId=dataset_id)) - request = bigquery.BigqueryDatasetsInsertRequest( - projectId=project_id, dataset=dataset) - response = self.client.datasets.Insert(request) - # The response is a bigquery.Dataset instance. - return response - else: - raise - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def _is_table_empty(self, project_id, dataset_id, table_id): - request = bigquery.BigqueryTabledataListRequest( - projectId=project_id, datasetId=dataset_id, tableId=table_id, - maxResults=1) - response = self.client.tabledata.List(request) - # The response is a bigquery.TableDataList instance. - return response.totalRows == 0 - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def _delete_table(self, project_id, dataset_id, table_id): - request = bigquery.BigqueryTablesDeleteRequest( - projectId=project_id, datasetId=dataset_id, tableId=table_id) - try: - self.client.tables.Delete(request) - except HttpError as exn: - if exn.status_code == 404: - logging.warning('Table %s:%s.%s does not exist', project_id, - dataset_id, table_id) - return - else: - raise - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def _delete_dataset(self, project_id, dataset_id, delete_contents=True): - request = bigquery.BigqueryDatasetsDeleteRequest( - projectId=project_id, datasetId=dataset_id, - deleteContents=delete_contents) - try: - self.client.datasets.Delete(request) - except HttpError as exn: - if exn.status_code == 404: - logging.warning('Dataaset %s:%s does not exist', project_id, - dataset_id) - return - else: - raise - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def create_temporary_dataset(self, project_id): - dataset_id = BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix - # Check if dataset exists to make sure that the temporary id is unique - try: - self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest( - projectId=project_id, datasetId=dataset_id)) - if project_id is not None: - # Unittests don't pass projectIds so they can be run without error - raise RuntimeError( - 'Dataset %s:%s already exists so cannot be used as temporary.' - % (project_id, dataset_id)) - except HttpError as exn: - if exn.status_code == 404: - logging.warning('Dataset does not exist so we will create it') - self.get_or_create_dataset(project_id, dataset_id) - else: - raise - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def clean_up_temporary_dataset(self, project_id): - temp_table = self._get_temp_table(project_id) - try: - self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest( - projectId=project_id, datasetId=temp_table.datasetId)) - except HttpError as exn: - if exn.status_code == 404: - logging.warning('Dataset %s:%s does not exist', project_id, - temp_table.datasetId) - return - else: - raise - self._delete_dataset(temp_table.projectId, temp_table.datasetId, True) - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def get_or_create_table( - self, project_id, dataset_id, table_id, schema, - create_disposition, write_disposition): - """Gets or creates a table based on create and write dispositions. - - The function mimics the behavior of BigQuery import jobs when using the - same create and write dispositions. - - Args: - project_id: The project id owning the table. - dataset_id: The dataset id owning the table. - table_id: The table id. - schema: A bigquery.TableSchema instance or None. - create_disposition: CREATE_NEVER or CREATE_IF_NEEDED. - write_disposition: WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. - - Returns: - A bigquery.Table instance if table was found or created. - - Raises: - RuntimeError: For various mismatches between the state of the table and - the create/write dispositions passed in. For example if the table is not - empty and WRITE_EMPTY was specified then an error will be raised since - the table was expected to be empty. - """ - found_table = None - try: - found_table = self._get_table(project_id, dataset_id, table_id) - except HttpError as exn: - if exn.status_code == 404: - if create_disposition == BigQueryDisposition.CREATE_NEVER: - raise RuntimeError( - 'Table %s:%s.%s not found but create disposition is CREATE_NEVER.' - % (project_id, dataset_id, table_id)) - else: - raise - - # If table exists already then handle the semantics for WRITE_EMPTY and - # WRITE_TRUNCATE write dispositions. - if found_table: - table_empty = self._is_table_empty(project_id, dataset_id, table_id) - if (not table_empty and - write_disposition == BigQueryDisposition.WRITE_EMPTY): - raise RuntimeError( - 'Table %s:%s.%s is not empty but write disposition is WRITE_EMPTY.' - % (project_id, dataset_id, table_id)) - # Delete the table and recreate it (later) if WRITE_TRUNCATE was - # specified. - if write_disposition == BigQueryDisposition.WRITE_TRUNCATE: - self._delete_table(project_id, dataset_id, table_id) - - # Create a new table potentially reusing the schema from a previously - # found table in case the schema was not specified. - if schema is None and found_table is None: - raise RuntimeError( - 'Table %s:%s.%s requires a schema. None can be inferred because the ' - 'table does not exist.' - % (project_id, dataset_id, table_id)) - if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE: - return found_table - else: - # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete - # the table before this point. - return self._create_table(project_id=project_id, - dataset_id=dataset_id, - table_id=table_id, - schema=schema or found_table.schema) - - def run_query(self, project_id, query, use_legacy_sql, flatten_results, - dry_run=False): - job_id = self._start_query_job(project_id, query, use_legacy_sql, - flatten_results, job_id=uuid.uuid4().hex, - dry_run=dry_run) - if dry_run: - # If this was a dry run then the fact that we get here means the - # query has no errors. The start_query_job would raise an error otherwise. - return - page_token = None - while True: - response = self._get_query_results(project_id, job_id, page_token) - if not response.jobComplete: - # The jobComplete field can be False if the query request times out - # (default is 10 seconds). Note that this is a timeout for the query - # request not for the actual execution of the query in the service. If - # the request times out we keep trying. This situation is quite possible - # if the query will return a large number of rows. - logging.info('Waiting on response from query: %s ...', query) - time.sleep(1.0) - continue - # We got some results. The last page is signalled by a missing pageToken. - yield response.rows, response.schema - if not response.pageToken: - break - page_token = response.pageToken - - def insert_rows(self, project_id, dataset_id, table_id, rows): - """Inserts rows into the specified table. - - Args: - project_id: The project id owning the table. - dataset_id: The dataset id owning the table. - table_id: The table id. - rows: A list of plain Python dictionaries. Each dictionary is a row and - each key in it is the name of a field. - - Returns: - A tuple (bool, errors). If first element is False then the second element - will be a bigquery.InserttErrorsValueListEntry instance containing - specific errors. - """ - - # Prepare rows for insertion. Of special note is the row ID that we add to - # each row in order to help BigQuery avoid inserting a row multiple times. - # BigQuery will do a best-effort if unique IDs are provided. This situation - # can happen during retries on failures. - # TODO(silviuc): Must add support to writing TableRow's instead of dicts. - final_rows = [] - for row in rows: - json_object = bigquery.JsonObject() - for k, v in row.iteritems(): - json_object.additionalProperties.append( - bigquery.JsonObject.AdditionalProperty( - key=k, value=to_json_value(v))) - final_rows.append( - bigquery.TableDataInsertAllRequest.RowsValueListEntry( - insertId=str(self.unique_row_id), - json=json_object)) - result, errors = self._insert_all_rows( - project_id, dataset_id, table_id, final_rows) - return result, errors - - def _convert_cell_value_to_dict(self, value, field): - if field.type == 'STRING': - # Input: "XYZ" --> Output: "XYZ" - return value - elif field.type == 'BOOLEAN': - # Input: "true" --> Output: True - return value == 'true' - elif field.type == 'INTEGER': - # Input: "123" --> Output: 123 - return int(value) - elif field.type == 'FLOAT': - # Input: "1.23" --> Output: 1.23 - return float(value) - elif field.type == 'TIMESTAMP': - # The UTC should come from the timezone library but this is a known - # issue in python 2.7 so we'll just hardcode it as we're reading using - # utcfromtimestamp. - # Input: 1478134176.985864 --> Output: "2016-11-03 00:49:36.985864 UTC" - dt = datetime.datetime.utcfromtimestamp(float(value)) - return dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC') - elif field.type == 'BYTES': - # Input: "YmJi" --> Output: "YmJi" - return value - elif field.type == 'DATE': - # Input: "2016-11-03" --> Output: "2016-11-03" - return value - elif field.type == 'DATETIME': - # Input: "2016-11-03T00:49:36" --> Output: "2016-11-03T00:49:36" - return value - elif field.type == 'TIME': - # Input: "00:49:36" --> Output: "00:49:36" - return value - elif field.type == 'RECORD': - # Note that a schema field object supports also a RECORD type. However - # when querying, the repeated and/or record fields are flattened - # unless we pass the flatten_results flag as False to the source - return self.convert_row_to_dict(value, field) - else: - raise RuntimeError('Unexpected field type: %s' % field.type) - - def convert_row_to_dict(self, row, schema): - """Converts a TableRow instance using the schema to a Python dict.""" - result = {} - for index, field in enumerate(schema.fields): - value = None - if isinstance(schema, bigquery.TableSchema): - cell = row.f[index] - value = from_json_value(cell.v) if cell.v is not None else None - elif isinstance(schema, bigquery.TableFieldSchema): - cell = row['f'][index] - value = cell['v'] if 'v' in cell else None - if field.mode == 'REPEATED': - result[field.name] = [self._convert_cell_value_to_dict(x['v'], field) - for x in value] - elif value is None: - if not field.mode == 'NULLABLE': - raise ValueError('Received \'None\' as the value for the field %s ' - 'but the field is not NULLABLE.', field.name) - result[field.name] = None - else: - result[field.name] = self._convert_cell_value_to_dict(value, field) - return result http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/io/bigquery_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py deleted file mode 100644 index 1299a31..0000000 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ /dev/null @@ -1,812 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for BigQuery sources and sinks.""" - -import datetime -import json -import logging -import time -import unittest - -import hamcrest as hc -import mock -from apitools.base.py.exceptions import HttpError - -import apache_beam as beam -from apache_beam.internal.json_value import to_json_value -from apache_beam.io.bigquery import RowAsDictJsonCoder -from apache_beam.io.bigquery import TableRowJsonCoder -from apache_beam.io.bigquery import parse_table_schema_from_json -from apache_beam.io.google_cloud_platform.internal.clients import bigquery -from apache_beam.transforms.display import DisplayData -from apache_beam.transforms.display_test import DisplayDataItemMatcher -from apache_beam.utils.pipeline_options import PipelineOptions - - -class TestRowAsDictJsonCoder(unittest.TestCase): - - def test_row_as_dict(self): - coder = RowAsDictJsonCoder() - test_value = {'s': 'abc', 'i': 123, 'f': 123.456, 'b': True} - self.assertEqual(test_value, coder.decode(coder.encode(test_value))) - - def json_compliance_exception(self, value): - with self.assertRaises(ValueError) as exn: - coder = RowAsDictJsonCoder() - test_value = {'s': value} - self.assertEqual(test_value, coder.decode(coder.encode(test_value))) - self.assertTrue(bigquery.JSON_COMPLIANCE_ERROR in exn.exception.message) - - def test_invalid_json_nan(self): - self.json_compliance_exception(float('nan')) - - def test_invalid_json_inf(self): - self.json_compliance_exception(float('inf')) - - def test_invalid_json_neg_inf(self): - self.json_compliance_exception(float('-inf')) - - -class TestTableRowJsonCoder(unittest.TestCase): - - def test_row_as_table_row(self): - schema_definition = [ - ('s', 'STRING'), - ('i', 'INTEGER'), - ('f', 'FLOAT'), - ('b', 'BOOLEAN'), - ('r', 'RECORD')] - data_defination = [ - 'abc', - 123, - 123.456, - True, - {'a': 'b'}] - str_def = '{"s": "abc", "i": 123, "f": 123.456, "b": true, "r": {"a": "b"}}' - schema = bigquery.TableSchema( - fields=[bigquery.TableFieldSchema(name=k, type=v) - for k, v in schema_definition]) - coder = TableRowJsonCoder(table_schema=schema) - test_row = bigquery.TableRow( - f=[bigquery.TableCell(v=to_json_value(e)) for e in data_defination]) - - self.assertEqual(str_def, coder.encode(test_row)) - self.assertEqual(test_row, coder.decode(coder.encode(test_row))) - # A coder without schema can still decode. - self.assertEqual( - test_row, TableRowJsonCoder().decode(coder.encode(test_row))) - - def test_row_and_no_schema(self): - coder = TableRowJsonCoder() - test_row = bigquery.TableRow( - f=[bigquery.TableCell(v=to_json_value(e)) - for e in ['abc', 123, 123.456, True]]) - with self.assertRaises(AttributeError) as ctx: - coder.encode(test_row) - self.assertTrue( - ctx.exception.message.startswith('The TableRowJsonCoder requires')) - - def json_compliance_exception(self, value): - with self.assertRaises(ValueError) as exn: - schema_definition = [('f', 'FLOAT')] - schema = bigquery.TableSchema( - fields=[bigquery.TableFieldSchema(name=k, type=v) - for k, v in schema_definition]) - coder = TableRowJsonCoder(table_schema=schema) - test_row = bigquery.TableRow( - f=[bigquery.TableCell(v=to_json_value(value))]) - coder.encode(test_row) - self.assertTrue(bigquery.JSON_COMPLIANCE_ERROR in exn.exception.message) - - def test_invalid_json_nan(self): - self.json_compliance_exception(float('nan')) - - def test_invalid_json_inf(self): - self.json_compliance_exception(float('inf')) - - def test_invalid_json_neg_inf(self): - self.json_compliance_exception(float('-inf')) - - -class TestTableSchemaParser(unittest.TestCase): - def test_parse_table_schema_from_json(self): - string_field = bigquery.TableFieldSchema( - name='s', type='STRING', mode='NULLABLE', description='s description') - number_field = bigquery.TableFieldSchema( - name='n', type='INTEGER', mode='REQUIRED', description='n description') - record_field = bigquery.TableFieldSchema( - name='r', type='RECORD', mode='REQUIRED', description='r description', - fields=[string_field, number_field]) - expected_schema = bigquery.TableSchema(fields=[record_field]) - json_str = json.dumps({'fields': [ - {'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED', - 'description': 'r description', 'fields': [ - {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE', - 'description': 's description'}, - {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED', - 'description': 'n description'}]}]}) - self.assertEqual(parse_table_schema_from_json(json_str), - expected_schema) - - -class TestBigQuerySource(unittest.TestCase): - - def test_display_data_item_on_validate_true(self): - source = beam.io.BigQuerySource('dataset.table', validate=True) - - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', True), - DisplayDataItemMatcher('table', 'dataset.table')] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - def test_table_reference_display_data(self): - source = beam.io.BigQuerySource('dataset.table') - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', False), - DisplayDataItemMatcher('table', 'dataset.table')] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - source = beam.io.BigQuerySource('project:dataset.table') - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', False), - DisplayDataItemMatcher('table', 'project:dataset.table')] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - source = beam.io.BigQuerySource('xyz.com:project:dataset.table') - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', - False), - DisplayDataItemMatcher('table', - 'xyz.com:project:dataset.table')] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - def test_parse_table_reference(self): - source = beam.io.BigQuerySource('dataset.table') - self.assertEqual(source.table_reference.datasetId, 'dataset') - self.assertEqual(source.table_reference.tableId, 'table') - - source = beam.io.BigQuerySource('project:dataset.table') - self.assertEqual(source.table_reference.projectId, 'project') - self.assertEqual(source.table_reference.datasetId, 'dataset') - self.assertEqual(source.table_reference.tableId, 'table') - - source = beam.io.BigQuerySource('xyz.com:project:dataset.table') - self.assertEqual(source.table_reference.projectId, 'xyz.com:project') - self.assertEqual(source.table_reference.datasetId, 'dataset') - self.assertEqual(source.table_reference.tableId, 'table') - - source = beam.io.BigQuerySource(query='my_query') - self.assertEqual(source.query, 'my_query') - self.assertIsNone(source.table_reference) - self.assertTrue(source.use_legacy_sql) - - def test_query_only_display_data(self): - source = beam.io.BigQuerySource(query='my_query') - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', False), - DisplayDataItemMatcher('query', 'my_query')] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - def test_specify_query_sql_format(self): - source = beam.io.BigQuerySource(query='my_query', use_standard_sql=True) - self.assertEqual(source.query, 'my_query') - self.assertFalse(source.use_legacy_sql) - - def test_specify_query_flattened_records(self): - source = beam.io.BigQuerySource(query='my_query', flatten_results=False) - self.assertFalse(source.flatten_results) - - def test_specify_query_unflattened_records(self): - source = beam.io.BigQuerySource(query='my_query', flatten_results=True) - self.assertTrue(source.flatten_results) - - def test_specify_query_without_table(self): - source = beam.io.BigQuerySource(query='my_query') - self.assertEqual(source.query, 'my_query') - self.assertIsNone(source.table_reference) - - def test_date_partitioned_table_name(self): - source = beam.io.BigQuerySource('dataset.table$20030102', validate=True) - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', True), - DisplayDataItemMatcher('table', 'dataset.table$20030102')] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - -class TestBigQuerySink(unittest.TestCase): - - def test_table_spec_display_data(self): - sink = beam.io.BigQuerySink('dataset.table') - dd = DisplayData.create_from(sink) - expected_items = [ - DisplayDataItemMatcher('table', 'dataset.table'), - DisplayDataItemMatcher('validation', False)] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - def test_parse_schema_descriptor(self): - sink = beam.io.BigQuerySink( - 'dataset.table', schema='s:STRING, n:INTEGER') - self.assertEqual(sink.table_reference.datasetId, 'dataset') - self.assertEqual(sink.table_reference.tableId, 'table') - result_schema = { - field.name: field.type for field in sink.table_schema.fields} - self.assertEqual({'n': 'INTEGER', 's': 'STRING'}, result_schema) - - def test_project_table_display_data(self): - sinkq = beam.io.BigQuerySink('PROJECT:dataset.table') - dd = DisplayData.create_from(sinkq) - expected_items = [ - DisplayDataItemMatcher('table', 'PROJECT:dataset.table'), - DisplayDataItemMatcher('validation', False)] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - def test_simple_schema_as_json(self): - sink = beam.io.BigQuerySink( - 'PROJECT:dataset.table', schema='s:STRING, n:INTEGER') - self.assertEqual( - json.dumps({'fields': [ - {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, - {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}), - sink.schema_as_json()) - - def test_nested_schema_as_json(self): - string_field = bigquery.TableFieldSchema( - name='s', type='STRING', mode='NULLABLE', description='s description') - number_field = bigquery.TableFieldSchema( - name='n', type='INTEGER', mode='REQUIRED', description='n description') - record_field = bigquery.TableFieldSchema( - name='r', type='RECORD', mode='REQUIRED', description='r description', - fields=[string_field, number_field]) - schema = bigquery.TableSchema(fields=[record_field]) - sink = beam.io.BigQuerySink('dataset.table', schema=schema) - self.assertEqual( - {'fields': [ - {'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED', - 'description': 'r description', 'fields': [ - {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE', - 'description': 's description'}, - {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED', - 'description': 'n description'}]}]}, - json.loads(sink.schema_as_json())) - - -class TestBigQueryReader(unittest.TestCase): - - def get_test_rows(self): - now = time.time() - dt = datetime.datetime.utcfromtimestamp(float(now)) - ts = dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC') - expected_rows = [ - { - 'i': 1, - 's': 'abc', - 'f': 2.3, - 'b': True, - 't': ts, - 'dt': '2016-10-31', - 'ts': '22:39:12.627498', - 'dt_ts': '2008-12-25T07:30:00', - 'r': {'s2': 'b'}, - 'rpr': [{'s3': 'c', 'rpr2': [{'rs': ['d', 'e'], 's4': None}]}] - }, - { - 'i': 10, - 's': 'xyz', - 'f': -3.14, - 'b': False, - 'rpr': [], - 't': None, - 'dt': None, - 'ts': None, - 'dt_ts': None, - 'r': None, - }] - - nested_schema = [ - bigquery.TableFieldSchema( - name='s2', type='STRING', mode='NULLABLE')] - nested_schema_2 = [ - bigquery.TableFieldSchema( - name='s3', type='STRING', mode='NULLABLE'), - bigquery.TableFieldSchema( - name='rpr2', type='RECORD', mode='REPEATED', fields=[ - bigquery.TableFieldSchema( - name='rs', type='STRING', mode='REPEATED'), - bigquery.TableFieldSchema( - name='s4', type='STRING', mode='NULLABLE')])] - - schema = bigquery.TableSchema( - fields=[ - bigquery.TableFieldSchema( - name='b', type='BOOLEAN', mode='REQUIRED'), - bigquery.TableFieldSchema( - name='f', type='FLOAT', mode='REQUIRED'), - bigquery.TableFieldSchema( - name='i', type='INTEGER', mode='REQUIRED'), - bigquery.TableFieldSchema( - name='s', type='STRING', mode='REQUIRED'), - bigquery.TableFieldSchema( - name='t', type='TIMESTAMP', mode='NULLABLE'), - bigquery.TableFieldSchema( - name='dt', type='DATE', mode='NULLABLE'), - bigquery.TableFieldSchema( - name='ts', type='TIME', mode='NULLABLE'), - bigquery.TableFieldSchema( - name='dt_ts', type='DATETIME', mode='NULLABLE'), - bigquery.TableFieldSchema( - name='r', type='RECORD', mode='NULLABLE', - fields=nested_schema), - bigquery.TableFieldSchema( - name='rpr', type='RECORD', mode='REPEATED', - fields=nested_schema_2)]) - - table_rows = [ - bigquery.TableRow(f=[ - bigquery.TableCell(v=to_json_value('true')), - bigquery.TableCell(v=to_json_value(str(2.3))), - bigquery.TableCell(v=to_json_value(str(1))), - bigquery.TableCell(v=to_json_value('abc')), - # For timestamps cannot use str() because it will truncate the - # number representing the timestamp. - bigquery.TableCell(v=to_json_value('%f' % now)), - bigquery.TableCell(v=to_json_value('2016-10-31')), - bigquery.TableCell(v=to_json_value('22:39:12.627498')), - bigquery.TableCell(v=to_json_value('2008-12-25T07:30:00')), - # For record we cannot use dict because it doesn't create nested - # schemas correctly so we have to use this f,v based format - bigquery.TableCell(v=to_json_value({'f': [{'v': 'b'}]})), - bigquery.TableCell(v=to_json_value([{'v':{'f':[{'v': 'c'}, {'v':[ - {'v':{'f':[{'v':[{'v':'d'}, {'v':'e'}]}, {'v':None}]}}]}]}}])) - ]), - bigquery.TableRow(f=[ - bigquery.TableCell(v=to_json_value('false')), - bigquery.TableCell(v=to_json_value(str(-3.14))), - bigquery.TableCell(v=to_json_value(str(10))), - bigquery.TableCell(v=to_json_value('xyz')), - bigquery.TableCell(v=None), - bigquery.TableCell(v=None), - bigquery.TableCell(v=None), - bigquery.TableCell(v=None), - bigquery.TableCell(v=None), - bigquery.TableCell(v=to_json_value([]))])] - return table_rows, schema, expected_rows - - def test_read_from_table(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference( - jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - actual_rows = [] - with beam.io.BigQuerySource('dataset.table').reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, expected_rows) - self.assertEqual(schema, reader.schema) - - def test_read_from_query(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference( - jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - actual_rows = [] - with beam.io.BigQuerySource(query='query').reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, expected_rows) - self.assertEqual(schema, reader.schema) - self.assertTrue(reader.use_legacy_sql) - self.assertTrue(reader.flatten_results) - - def test_read_from_query_sql_format(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference( - jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - actual_rows = [] - with beam.io.BigQuerySource( - query='query', use_standard_sql=True).reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, expected_rows) - self.assertEqual(schema, reader.schema) - self.assertFalse(reader.use_legacy_sql) - self.assertTrue(reader.flatten_results) - - def test_read_from_query_unflatten_records(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference( - jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - actual_rows = [] - with beam.io.BigQuerySource( - query='query', flatten_results=False).reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, expected_rows) - self.assertEqual(schema, reader.schema) - self.assertTrue(reader.use_legacy_sql) - self.assertFalse(reader.flatten_results) - - def test_using_both_query_and_table_fails(self): - with self.assertRaises(ValueError) as exn: - beam.io.BigQuerySource(table='dataset.table', query='query') - self.assertEqual(exn.exception.message, 'Both a BigQuery table and a' - ' query were specified. Please specify only one of ' - 'these.') - - def test_using_neither_query_nor_table_fails(self): - with self.assertRaises(ValueError) as exn: - beam.io.BigQuerySource() - self.assertEqual(exn.exception.message, 'A BigQuery table or a query' - ' must be specified') - - def test_read_from_table_as_tablerows(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference( - jobId='somejob')) - table_rows, schema, _ = self.get_test_rows() - client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - actual_rows = [] - # We set the coder to TableRowJsonCoder, which is a signal that - # the caller wants to see the rows as TableRows. - with beam.io.BigQuerySource( - 'dataset.table', coder=TableRowJsonCoder).reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, table_rows) - self.assertEqual(schema, reader.schema) - - @mock.patch('time.sleep', return_value=None) - def test_read_from_table_and_job_complete_retry(self, patched_time_sleep): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference( - jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - # Return jobComplete=False on first call to trigger the code path where - # query needs to handle waiting a bit. - client.jobs.GetQueryResults.side_effect = [ - bigquery.GetQueryResultsResponse( - jobComplete=False), - bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema)] - actual_rows = [] - with beam.io.BigQuerySource('dataset.table').reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, expected_rows) - - def test_read_from_table_and_multiple_pages(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference( - jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - # Return a pageToken on first call to trigger the code path where - # query needs to handle multiple pages of results. - client.jobs.GetQueryResults.side_effect = [ - bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema, - pageToken='token'), - bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema)] - actual_rows = [] - with beam.io.BigQuerySource('dataset.table').reader(client) as reader: - for row in reader: - actual_rows.append(row) - # We return expected rows for each of the two pages of results so we - # adjust our expectation below accordingly. - self.assertEqual(actual_rows, expected_rows * 2) - - def test_table_schema_without_project(self): - # Reader should pick executing project by default. - source = beam.io.BigQuerySource(table='mydataset.mytable') - options = PipelineOptions(flags=['--project', 'myproject']) - source.pipeline_options = options - reader = source.reader() - self.assertEquals('SELECT * FROM [myproject:mydataset.mytable];', - reader.query) - - -class TestBigQueryWriter(unittest.TestCase): - - @mock.patch('time.sleep', return_value=None) - def test_no_table_and_create_never(self, patched_time_sleep): - client = mock.Mock() - client.tables.Get.side_effect = HttpError( - response={'status': '404'}, url='', content='') - create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER - with self.assertRaises(RuntimeError) as exn: - with beam.io.BigQuerySink( - 'project:dataset.table', - create_disposition=create_disposition).writer(client): - pass - self.assertEqual( - exn.exception.message, - 'Table project:dataset.table not found but create disposition is ' - 'CREATE_NEVER.') - - def test_no_table_and_create_if_needed(self): - client = mock.Mock() - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tables.Get.side_effect = HttpError( - response={'status': '404'}, url='', content='') - client.tables.Insert.return_value = table - create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED - with beam.io.BigQuerySink( - 'project:dataset.table', - schema='somefield:INTEGER', - create_disposition=create_disposition).writer(client): - pass - self.assertTrue(client.tables.Get.called) - self.assertTrue(client.tables.Insert.called) - - @mock.patch('time.sleep', return_value=None) - def test_no_table_and_create_if_needed_and_no_schema( - self, patched_time_sleep): - client = mock.Mock() - client.tables.Get.side_effect = HttpError( - response={'status': '404'}, url='', content='') - create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED - with self.assertRaises(RuntimeError) as exn: - with beam.io.BigQuerySink( - 'project:dataset.table', - create_disposition=create_disposition).writer(client): - pass - self.assertEqual( - exn.exception.message, - 'Table project:dataset.table requires a schema. None can be inferred ' - 'because the table does not exist.') - - @mock.patch('time.sleep', return_value=None) - def test_table_not_empty_and_write_disposition_empty( - self, patched_time_sleep): - client = mock.Mock() - client.tables.Get.return_value = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tabledata.List.return_value = bigquery.TableDataList(totalRows=1) - write_disposition = beam.io.BigQueryDisposition.WRITE_EMPTY - with self.assertRaises(RuntimeError) as exn: - with beam.io.BigQuerySink( - 'project:dataset.table', - write_disposition=write_disposition).writer(client): - pass - self.assertEqual( - exn.exception.message, - 'Table project:dataset.table is not empty but write disposition is ' - 'WRITE_EMPTY.') - - def test_table_empty_and_write_disposition_empty(self): - client = mock.Mock() - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tables.Get.return_value = table - client.tabledata.List.return_value = bigquery.TableDataList(totalRows=0) - client.tables.Insert.return_value = table - write_disposition = beam.io.BigQueryDisposition.WRITE_EMPTY - with beam.io.BigQuerySink( - 'project:dataset.table', - write_disposition=write_disposition).writer(client): - pass - self.assertTrue(client.tables.Get.called) - self.assertTrue(client.tabledata.List.called) - self.assertFalse(client.tables.Delete.called) - self.assertFalse(client.tables.Insert.called) - - def test_table_with_write_disposition_truncate(self): - client = mock.Mock() - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tables.Get.return_value = table - client.tables.Insert.return_value = table - write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE - with beam.io.BigQuerySink( - 'project:dataset.table', - write_disposition=write_disposition).writer(client): - pass - self.assertTrue(client.tables.Get.called) - self.assertTrue(client.tables.Delete.called) - self.assertTrue(client.tables.Insert.called) - - def test_table_with_write_disposition_append(self): - client = mock.Mock() - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tables.Get.return_value = table - client.tables.Insert.return_value = table - write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND - with beam.io.BigQuerySink( - 'project:dataset.table', - write_disposition=write_disposition).writer(client): - pass - self.assertTrue(client.tables.Get.called) - self.assertFalse(client.tables.Delete.called) - self.assertFalse(client.tables.Insert.called) - - def test_rows_are_written(self): - client = mock.Mock() - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tables.Get.return_value = table - write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND - - insert_response = mock.Mock() - insert_response.insertErrors = [] - client.tabledata.InsertAll.return_value = insert_response - - with beam.io.BigQuerySink( - 'project:dataset.table', - write_disposition=write_disposition).writer(client) as writer: - writer.Write({'i': 1, 'b': True, 's': 'abc', 'f': 3.14}) - - sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14} - expected_rows = [] - json_object = bigquery.JsonObject() - for k, v in sample_row.iteritems(): - json_object.additionalProperties.append( - bigquery.JsonObject.AdditionalProperty( - key=k, value=to_json_value(v))) - expected_rows.append( - bigquery.TableDataInsertAllRequest.RowsValueListEntry( - insertId='_1', # First row ID generated with prefix '' - json=json_object)) - client.tabledata.InsertAll.assert_called_with( - bigquery.BigqueryTabledataInsertAllRequest( - projectId='project', datasetId='dataset', tableId='table', - tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest( - rows=expected_rows))) - - def test_table_schema_without_project(self): - # Writer should pick executing project by default. - sink = beam.io.BigQuerySink(table='mydataset.mytable') - options = PipelineOptions(flags=['--project', 'myproject']) - sink.pipeline_options = options - writer = sink.writer() - self.assertEquals('myproject', writer.project_id) - - -class TestBigQueryWrapper(unittest.TestCase): - - def test_delete_non_existing_dataset(self): - client = mock.Mock() - client.datasets.Delete.side_effect = HttpError( - response={'status': '404'}, url='', content='') - wrapper = beam.io.bigquery.BigQueryWrapper(client) - wrapper._delete_dataset('', '') - self.assertTrue(client.datasets.Delete.called) - - @mock.patch('time.sleep', return_value=None) - def test_delete_dataset_retries_fail(self, patched_time_sleep): - client = mock.Mock() - client.datasets.Delete.side_effect = ValueError("Cannot delete") - wrapper = beam.io.bigquery.BigQueryWrapper(client) - with self.assertRaises(ValueError) as _: - wrapper._delete_dataset('', '') - self.assertEqual( - beam.io.bigquery.MAX_RETRIES + 1, client.datasets.Delete.call_count) - self.assertTrue(client.datasets.Delete.called) - - def test_delete_non_existing_table(self): - client = mock.Mock() - client.tables.Delete.side_effect = HttpError( - response={'status': '404'}, url='', content='') - wrapper = beam.io.bigquery.BigQueryWrapper(client) - wrapper._delete_table('', '', '') - self.assertTrue(client.tables.Delete.called) - - @mock.patch('time.sleep', return_value=None) - def test_delete_table_retries_fail(self, patched_time_sleep): - client = mock.Mock() - client.tables.Delete.side_effect = ValueError("Cannot delete") - wrapper = beam.io.bigquery.BigQueryWrapper(client) - with self.assertRaises(ValueError) as _: - wrapper._delete_table('', '', '') - self.assertTrue(client.tables.Delete.called) - - @mock.patch('time.sleep', return_value=None) - def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep): - client = mock.Mock() - client.datasets.Delete.side_effect = [ - HttpError( - response={'status': '408'}, url='', content=''), - bigquery.BigqueryDatasetsDeleteResponse() - ] - wrapper = beam.io.bigquery.BigQueryWrapper(client) - wrapper._delete_dataset('', '') - self.assertTrue(client.datasets.Delete.called) - - @mock.patch('time.sleep', return_value=None) - def test_delete_table_retries_for_timeouts(self, patched_time_sleep): - client = mock.Mock() - client.tables.Delete.side_effect = [ - HttpError( - response={'status': '408'}, url='', content=''), - bigquery.BigqueryTablesDeleteResponse() - ] - wrapper = beam.io.bigquery.BigQueryWrapper(client) - wrapper._delete_table('', '', '') - self.assertTrue(client.tables.Delete.called) - - @mock.patch('time.sleep', return_value=None) - def test_temporary_dataset_is_unique(self, patched_time_sleep): - client = mock.Mock() - client.datasets.Get.return_value = bigquery.Dataset( - datasetReference=bigquery.DatasetReference( - projectId='project_id', datasetId='dataset_id')) - wrapper = beam.io.bigquery.BigQueryWrapper(client) - with self.assertRaises(RuntimeError) as _: - wrapper.create_temporary_dataset('project_id') - self.assertTrue(client.datasets.Get.called) - - def test_get_or_create_dataset_created(self): - client = mock.Mock() - client.datasets.Get.side_effect = HttpError( - response={'status': '404'}, url='', content='') - client.datasets.Insert.return_value = bigquery.Dataset( - datasetReference=bigquery.DatasetReference( - projectId='project_id', datasetId='dataset_id')) - wrapper = beam.io.bigquery.BigQueryWrapper(client) - new_dataset = wrapper.get_or_create_dataset('project_id', 'dataset_id') - self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id') - - def test_get_or_create_dataset_fetched(self): - client = mock.Mock() - client.datasets.Get.return_value = bigquery.Dataset( - datasetReference=bigquery.DatasetReference( - projectId='project_id', datasetId='dataset_id')) - wrapper = beam.io.bigquery.BigQueryWrapper(client) - new_dataset = wrapper.get_or_create_dataset('project_id', 'dataset_id') - self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id') - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/io/datastore/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/__init__.py b/sdks/python/apache_beam/io/datastore/__init__.py deleted file mode 100644 index cce3aca..0000000 --- a/sdks/python/apache_beam/io/datastore/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/io/datastore/v1/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/__init__.py b/sdks/python/apache_beam/io/datastore/v1/__init__.py deleted file mode 100644 index cce3aca..0000000 --- a/sdks/python/apache_beam/io/datastore/v1/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#