[BEAM-1218] Move GCP specific IO into separate module

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/908c8532
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/908c8532
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/908c8532

Branch: refs/heads/master
Commit: 908c85327ce17a0ed64401d1e86cb86396284fdb
Parents: 2872f86
Author: Sourabh Bajaj <sourabhba...@google.com>
Authored: Sat Feb 18 22:52:06 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Sun Feb 19 16:21:25 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   |    4 +-
 sdks/python/apache_beam/io/__init__.py          |    4 +-
 sdks/python/apache_beam/io/bigquery.py          | 1082 ------------------
 sdks/python/apache_beam/io/bigquery_test.py     |  812 -------------
 .../python/apache_beam/io/datastore/__init__.py |   16 -
 .../apache_beam/io/datastore/v1/__init__.py     |   16 -
 .../apache_beam/io/datastore/v1/datastoreio.py  |  391 -------
 .../io/datastore/v1/datastoreio_test.py         |  237 ----
 .../io/datastore/v1/fake_datastore.py           |   92 --
 .../apache_beam/io/datastore/v1/helper.py       |  267 -----
 .../apache_beam/io/datastore/v1/helper_test.py  |  256 -----
 .../io/datastore/v1/query_splitter.py           |  269 -----
 .../io/datastore/v1/query_splitter_test.py      |  201 ----
 sdks/python/apache_beam/io/fileio.py            |    2 +-
 sdks/python/apache_beam/io/gcsio.py             |  871 --------------
 sdks/python/apache_beam/io/gcsio_test.py        |  786 -------------
 .../io/google_cloud_platform/bigquery.py        | 1082 ++++++++++++++++++
 .../io/google_cloud_platform/bigquery_test.py   |  813 +++++++++++++
 .../google_cloud_platform/datastore/__init__.py |   16 +
 .../datastore/v1/__init__.py                    |   16 +
 .../datastore/v1/datastoreio.py                 |  391 +++++++
 .../datastore/v1/datastoreio_test.py            |  237 ++++
 .../datastore/v1/fake_datastore.py              |   92 ++
 .../datastore/v1/helper.py                      |  267 +++++
 .../datastore/v1/helper_test.py                 |  256 +++++
 .../datastore/v1/query_splitter.py              |  269 +++++
 .../datastore/v1/query_splitter_test.py         |  201 ++++
 .../io/google_cloud_platform/gcsio.py           |  871 ++++++++++++++
 .../io/google_cloud_platform/gcsio_test.py      |  786 +++++++++++++
 .../io/google_cloud_platform/pubsub.py          |   91 ++
 .../io/google_cloud_platform/pubsub_test.py     |   63 +
 sdks/python/apache_beam/io/pubsub.py            |   91 --
 sdks/python/apache_beam/io/pubsub_test.py       |   62 -
 33 files changed, 5456 insertions(+), 5454 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 6f081df..e7f28b0 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -867,8 +867,8 @@ def model_datastoreio():
   import googledatastore
   import apache_beam as beam
   from apache_beam.utils.pipeline_options import PipelineOptions
-  from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
-  from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
+  from apache_beam.io.google_cloud_platform.datastore.v1.datastoreio import 
ReadFromDatastore
+  from apache_beam.io.google_cloud_platform.datastore.v1.datastoreio import 
WriteToDatastore
 
   project = 'my_project'
   kind = 'my_kind'

http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/io/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/__init__.py 
b/sdks/python/apache_beam/io/__init__.py
index 13ce36f..972ed53 100644
--- a/sdks/python/apache_beam/io/__init__.py
+++ b/sdks/python/apache_beam/io/__init__.py
@@ -19,13 +19,13 @@
 
 # pylint: disable=wildcard-import
 from apache_beam.io.avroio import *
-from apache_beam.io.bigquery import *
 from apache_beam.io.fileio import *
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Sink
 from apache_beam.io.iobase import Write
 from apache_beam.io.iobase import Writer
-from apache_beam.io.pubsub import *
 from apache_beam.io.textio import *
 from apache_beam.io.tfrecordio import *
 from apache_beam.io.range_trackers import *
+from apache_beam.io.google_cloud_platform.bigquery import *
+from apache_beam.io.google_cloud_platform.pubsub import *

http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py 
b/sdks/python/apache_beam/io/bigquery.py
deleted file mode 100644
index 3beecea..0000000
--- a/sdks/python/apache_beam/io/bigquery.py
+++ /dev/null
@@ -1,1082 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""BigQuery sources and sinks.
-
-This module implements reading from and writing to BigQuery tables. It relies
-on several classes exposed by the BigQuery API: TableSchema, TableFieldSchema,
-TableRow, and TableCell. The default mode is to return table rows read from a
-BigQuery source as dictionaries. Similarly a Write transform to a BigQuerySink
-accepts PCollections of dictionaries. This is done for more convenient
-programming.  If desired, the native TableRow objects can be used throughout to
-represent rows (use an instance of TableRowJsonCoder as a coder argument when
-creating the sources or sinks respectively).
-
-Also, for programming convenience, instances of TableReference and TableSchema
-have a string representation that can be used for the corresponding arguments:
-
-  - TableReference can be a PROJECT:DATASET.TABLE or DATASET.TABLE string.
-  - TableSchema can be a NAME:TYPE{,NAME:TYPE}* string
-    (e.g. 'month:STRING,event_count:INTEGER').
-
-The syntax supported is described here:
-https://cloud.google.com/bigquery/bq-command-line-tool-quickstart
-
-BigQuery sources can be used as main inputs or side inputs. A main input
-(common case) is expected to be massive and will be split into manageable 
chunks
-and processed in parallel. Side inputs are expected to be small and will be 
read
-completely every time a ParDo DoFn gets executed. In the example below the
-lambda function implementing the DoFn for the Map transform will get on each
-call *one* row of the main table and *all* rows of the side table. The runner
-may use some caching techniques to share the side inputs between calls in order
-to avoid excessive reading:
-
-  main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource()
-  side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
-  results = (
-      main_table
-      | 'process data' >> beam.Map(
-          lambda element, side_input: ..., AsList(side_table)))
-
-There is no difference in how main and side inputs are read. What makes the
-side_table a 'side input' is the AsList wrapper used when passing the table
-as a parameter to the Map transform. AsList signals to the execution framework
-that its input should be made available whole.
-
-The main and side inputs are implemented differently. Reading a BigQuery table
-as main input entails exporting the table to a set of GCS files (currently in
-JSON format) and then processing those files. Reading the same table as a side
-input entails querying the table for all its rows. The coder argument on
-BigQuerySource controls the reading of the lines in the export files (i.e.,
-transform a JSON object into a PCollection element). The coder is not involved
-when the same table is read as a side input since there is no intermediate
-format involved. We get the table rows directly from the BigQuery service with
-a query.
-
-Users may provide a query to read from rather than reading all of a BigQuery
-table. If specified, the result obtained by executing the specified query will
-be used as the data of the input transform.
-
-  query_results = pipeline | beam.io.Read(beam.io.BigQuerySource(
-      query='SELECT year, mean_temp FROM samples.weather_stations'))
-
-When creating a BigQuery input transform, users should provide either a query
-or a table. Pipeline construction will fail with a validation error if neither
-or both are specified.
-
-*** Short introduction to BigQuery concepts ***
-Tables have rows (TableRow) and each row has cells (TableCell).
-A table has a schema (TableSchema), which in turn describes the schema of each
-cell (TableFieldSchema). The terms field and cell are used interchangeably.
-
-TableSchema: Describes the schema (types and order) for values in each row.
-  Has one attribute, 'field', which is list of TableFieldSchema objects.
-
-TableFieldSchema: Describes the schema (type, name) for one field.
-  Has several attributes, including 'name' and 'type'. Common values for
-  the type attribute are: 'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN'. All possible
-  values are described at:
-  https://cloud.google.com/bigquery/preparing-data-for-bigquery#datatypes
-
-TableRow: Holds all values in a table row. Has one attribute, 'f', which is a
-  list of TableCell instances.
-
-TableCell: Holds the value for one cell (or field).  Has one attribute,
-  'v', which is a JsonValue instance. This class is defined in
-  apitools.base.py.extra_types.py module.
-"""
-
-from __future__ import absolute_import
-
-import collections
-import datetime
-import json
-import logging
-import re
-import time
-import uuid
-
-from apitools.base.py.exceptions import HttpError
-
-from apache_beam import coders
-from apache_beam.internal import auth
-from apache_beam.internal.json_value import from_json_value
-from apache_beam.internal.json_value import to_json_value
-from apache_beam.runners.google_cloud_dataflow.native_io import iobase as 
dataflow_io
-from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.utils import retry
-from apache_beam.utils.pipeline_options import GoogleCloudOptions
-
-# Protect against environments where bigquery library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from apache_beam.io.google_cloud_platform.internal.clients import bigquery
-except ImportError:
-  pass
-# pylint: enable=wrong-import-order, wrong-import-position
-
-
-__all__ = [
-    'TableRowJsonCoder',
-    'BigQueryDisposition',
-    'BigQuerySource',
-    'BigQuerySink',
-    ]
-
-JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.'
-MAX_RETRIES = 3
-
-
-class RowAsDictJsonCoder(coders.Coder):
-  """A coder for a table row (represented as a dict) to/from a JSON string.
-
-  This is the default coder for sources and sinks if the coder argument is not
-  specified.
-  """
-
-  def encode(self, table_row):
-    # The normal error when dumping NAN/INF values is:
-    # ValueError: Out of range float values are not JSON compliant
-    # This code will catch this error to emit an error that explains
-    # to the programmer that they have used NAN/INF values.
-    try:
-      return json.dumps(table_row, allow_nan=False)
-    except ValueError as e:
-      raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
-
-  def decode(self, encoded_table_row):
-    return json.loads(encoded_table_row)
-
-
-class TableRowJsonCoder(coders.Coder):
-  """A coder for a TableRow instance to/from a JSON string.
-
-  Note that the encoding operation (used when writing to sinks) requires the
-  table schema in order to obtain the ordered list of field names. Reading from
-  sources on the other hand does not need the table schema.
-  """
-
-  def __init__(self, table_schema=None):
-    # The table schema is needed for encoding TableRows as JSON (writing to
-    # sinks) because the ordered list of field names is used in the JSON
-    # representation.
-    self.table_schema = table_schema
-    # Precompute field names since we need them for row encoding.
-    if self.table_schema:
-      self.field_names = tuple(fs.name for fs in self.table_schema.fields)
-
-  def encode(self, table_row):
-    if self.table_schema is None:
-      raise AttributeError(
-          'The TableRowJsonCoder requires a table schema for '
-          'encoding operations. Please specify a table_schema argument.')
-    try:
-      return json.dumps(
-          collections.OrderedDict(
-              zip(self.field_names,
-                  [from_json_value(f.v) for f in table_row.f])),
-          allow_nan=False)
-    except ValueError as e:
-      raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
-
-  def decode(self, encoded_table_row):
-    od = json.loads(
-        encoded_table_row, object_pairs_hook=collections.OrderedDict)
-    return bigquery.TableRow(
-        f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()])
-
-
-def parse_table_schema_from_json(schema_string):
-  """Parse the Table Schema provided as string.
-
-  Args:
-    schema_string: String serialized table schema, should be a valid JSON.
-
-  Returns:
-    A TableSchema of the BigQuery export from either the Query or the Table.
-  """
-  json_schema = json.loads(schema_string)
-
-  def _parse_schema_field(field):
-    """Parse a single schema field from dictionary.
-
-    Args:
-      field: Dictionary object containing serialized schema.
-
-    Returns:
-      A TableFieldSchema for a single column in BigQuery.
-    """
-    schema = bigquery.TableFieldSchema()
-    schema.name = field['name']
-    schema.type = field['type']
-    if 'mode' in field:
-      schema.mode = field['mode']
-    else:
-      schema.mode = 'NULLABLE'
-    if 'description' in field:
-      schema.description = field['description']
-    if 'fields' in field:
-      schema.fields = [_parse_schema_field(x) for x in field['fields']]
-    return schema
-
-  fields = [_parse_schema_field(f) for f in json_schema['fields']]
-  return bigquery.TableSchema(fields=fields)
-
-
-class BigQueryDisposition(object):
-  """Class holding standard strings used for create and write dispositions."""
-
-  CREATE_NEVER = 'CREATE_NEVER'
-  CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
-  WRITE_TRUNCATE = 'WRITE_TRUNCATE'
-  WRITE_APPEND = 'WRITE_APPEND'
-  WRITE_EMPTY = 'WRITE_EMPTY'
-
-  @staticmethod
-  def validate_create(disposition):
-    values = (BigQueryDisposition.CREATE_NEVER,
-              BigQueryDisposition.CREATE_IF_NEEDED)
-    if disposition not in values:
-      raise ValueError(
-          'Invalid create disposition %s. Expecting %s' % (disposition, 
values))
-    return disposition
-
-  @staticmethod
-  def validate_write(disposition):
-    values = (BigQueryDisposition.WRITE_TRUNCATE,
-              BigQueryDisposition.WRITE_APPEND,
-              BigQueryDisposition.WRITE_EMPTY)
-    if disposition not in values:
-      raise ValueError(
-          'Invalid write disposition %s. Expecting %s' % (disposition, values))
-    return disposition
-
-
-def _parse_table_reference(table, dataset=None, project=None):
-  """Parses a table reference into a (project, dataset, table) tuple.
-
-  Args:
-    table: The ID of the table. The ID must contain only letters
-      (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is 
None
-      then the table argument must contain the entire table reference:
-      'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. This argument can be a
-      bigquery.TableReference instance in which case dataset and project are
-      ignored and the reference is returned as a result.  Additionally, for 
date
-      partitioned tables, appending '$YYYYmmdd' to the table name is supported,
-      e.g. 'DATASET.TABLE$YYYYmmdd'.
-    dataset: The ID of the dataset containing this table or null if the table
-      reference is specified entirely by the table argument.
-    project: The ID of the project containing this table or null if the table
-      reference is specified entirely by the table (and possibly dataset)
-      argument.
-
-  Returns:
-    A bigquery.TableReference object. The object has the following attributes:
-    projectId, datasetId, and tableId.
-
-  Raises:
-    ValueError: if the table reference as a string does not match the expected
-      format.
-  """
-
-  if isinstance(table, bigquery.TableReference):
-    return table
-
-  table_reference = bigquery.TableReference()
-  # If dataset argument is not specified, the expectation is that the
-  # table argument will contain a full table reference instead of just a
-  # table name.
-  if dataset is None:
-    match = re.match(
-        r'^((?P<project>.+):)?(?P<dataset>\w+)\.(?P<table>[\w\$]+)$', table)
-    if not match:
-      raise ValueError(
-          'Expected a table reference (PROJECT:DATASET.TABLE or '
-          'DATASET.TABLE) instead of %s.' % table)
-    table_reference.projectId = match.group('project')
-    table_reference.datasetId = match.group('dataset')
-    table_reference.tableId = match.group('table')
-  else:
-    table_reference.projectId = project
-    table_reference.datasetId = dataset
-    table_reference.tableId = table
-  return table_reference
-
-
-# -----------------------------------------------------------------------------
-# BigQuerySource, BigQuerySink.
-
-
-class BigQuerySource(dataflow_io.NativeSource):
-  """A source based on a BigQuery table."""
-
-  def __init__(self, table=None, dataset=None, project=None, query=None,
-               validate=False, coder=None, use_standard_sql=False,
-               flatten_results=True):
-    """Initialize a BigQuerySource.
-
-    Args:
-      table: The ID of a BigQuery table. If specified all data of the table
-        will be used as input of the current source. The ID must contain only
-        letters (a-z, A-Z), numbers (0-9), or underscores (_). If dataset
-        and query arguments are None then the table argument must contain the
-        entire table reference specified as: 'DATASET.TABLE' or
-        'PROJECT:DATASET.TABLE'.
-      dataset: The ID of the dataset containing this table or null if the table
-        reference is specified entirely by the table argument or a query is
-        specified.
-      project: The ID of the project containing this table or null if the table
-        reference is specified entirely by the table argument or a query is
-        specified.
-      query: A query to be used instead of arguments table, dataset, and
-        project.
-      validate: If true, various checks will be done when source gets
-        initialized (e.g., is table present?). This should be True for most
-        scenarios in order to catch errors as early as possible (pipeline
-        construction instead of pipeline execution). It should be False if the
-        table is created during pipeline execution by a previous step.
-      coder: The coder for the table rows if serialized to disk. If None, then
-        the default coder is RowAsDictJsonCoder, which will interpret every 
line
-        in a file as a JSON serialized dictionary. This argument needs a value
-        only in special cases when returning table rows as dictionaries is not
-        desirable.
-      use_standard_sql: Specifies whether to use BigQuery's standard
-        SQL dialect for this query. The default value is False. If set to True,
-        the query will use BigQuery's updated SQL dialect with improved
-        standards compliance. This parameter is ignored for table inputs.
-      flatten_results: Flattens all nested and repeated fields in the
-        query results. The default value is true.
-
-    Raises:
-      ValueError: if any of the following is true
-      (1) the table reference as a string does not match the expected format
-      (2) neither a table nor a query is specified
-      (3) both a table and a query is specified.
-    """
-
-    if table is not None and query is not None:
-      raise ValueError('Both a BigQuery table and a query were specified.'
-                       ' Please specify only one of these.')
-    elif table is None and query is None:
-      raise ValueError('A BigQuery table or a query must be specified')
-    elif table is not None:
-      self.table_reference = _parse_table_reference(table, dataset, project)
-      self.query = None
-      self.use_legacy_sql = True
-    else:
-      self.query = query
-      # TODO(BEAM-1082): Change the internal flag to be standard_sql
-      self.use_legacy_sql = not use_standard_sql
-      self.table_reference = None
-
-    self.validate = validate
-    self.flatten_results = flatten_results
-    self.coder = coder or RowAsDictJsonCoder()
-
-  def display_data(self):
-    if self.query is not None:
-      res = {'query': DisplayDataItem(self.query, label='Query')}
-    else:
-      if self.table_reference.projectId is not None:
-        tableSpec = '{}:{}.{}'.format(self.table_reference.projectId,
-                                      self.table_reference.datasetId,
-                                      self.table_reference.tableId)
-      else:
-        tableSpec = '{}.{}'.format(self.table_reference.datasetId,
-                                   self.table_reference.tableId)
-      res = {'table': DisplayDataItem(tableSpec, label='Table')}
-
-    res['validation'] = DisplayDataItem(self.validate,
-                                        label='Validation Enabled')
-    return res
-
-  @property
-  def format(self):
-    """Source format name required for remote execution."""
-    return 'bigquery'
-
-  def reader(self, test_bigquery_client=None):
-    return BigQueryReader(
-        source=self,
-        test_bigquery_client=test_bigquery_client,
-        use_legacy_sql=self.use_legacy_sql,
-        flatten_results=self.flatten_results)
-
-
-class BigQuerySink(dataflow_io.NativeSink):
-  """A sink based on a BigQuery table."""
-
-  def __init__(self, table, dataset=None, project=None, schema=None,
-               create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
-               write_disposition=BigQueryDisposition.WRITE_EMPTY,
-               validate=False, coder=None):
-    """Initialize a BigQuerySink.
-
-    Args:
-      table: The ID of the table. The ID must contain only letters
-        (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
-        None then the table argument must contain the entire table reference
-        specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
-      dataset: The ID of the dataset containing this table or null if the table
-        reference is specified entirely by the table argument.
-      project: The ID of the project containing this table or null if the table
-        reference is specified entirely by the table argument.
-      schema: The schema to be used if the BigQuery table to write has to be
-        created. This can be either specified as a 'bigquery.TableSchema' 
object
-        or a single string  of the form 
'field1:type1,field2:type2,field3:type3'
-        that defines a comma separated list of fields. Here 'type' should
-        specify the BigQuery type of the field. Single string based schemas do
-        not support nested fields, repeated fields, or specifying a BigQuery
-        mode for fields (mode will always be set to 'NULLABLE').
-      create_disposition: A string describing what happens if the table does 
not
-        exist. Possible values are:
-        - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
-        - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
-      write_disposition: A string describing what happens if the table has
-        already some data. Possible values are:
-        -  BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
-        -  BigQueryDisposition.WRITE_APPEND: add to existing rows.
-        -  BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
-      validate: If true, various checks will be done when sink gets
-        initialized (e.g., is table present given the disposition arguments?).
-        This should be True for most scenarios in order to catch errors as 
early
-        as possible (pipeline construction instead of pipeline execution). It
-        should be False if the table is created during pipeline execution by a
-        previous step.
-      coder: The coder for the table rows if serialized to disk. If None, then
-        the default coder is RowAsDictJsonCoder, which will interpret every
-        element written to the sink as a dictionary that will be JSON 
serialized
-        as a line in a file. This argument needs a value only in special cases
-        when writing table rows as dictionaries is not desirable.
-
-    Raises:
-      TypeError: if the schema argument is not a string or a TableSchema 
object.
-      ValueError: if the table reference as a string does not match the 
expected
-      format.
-    """
-    self.table_reference = _parse_table_reference(table, dataset, project)
-    # Transform the table schema into a bigquery.TableSchema instance.
-    if isinstance(schema, basestring):
-      # TODO(silviuc): Should add a regex-based validation of the format.
-      table_schema = bigquery.TableSchema()
-      schema_list = [s.strip(' ') for s in schema.split(',')]
-      for field_and_type in schema_list:
-        field_name, field_type = field_and_type.split(':')
-        field_schema = bigquery.TableFieldSchema()
-        field_schema.name = field_name
-        field_schema.type = field_type
-        field_schema.mode = 'NULLABLE'
-        table_schema.fields.append(field_schema)
-      self.table_schema = table_schema
-    elif schema is None:
-      # TODO(silviuc): Should check that table exists if no schema specified.
-      self.table_schema = schema
-    elif isinstance(schema, bigquery.TableSchema):
-      self.table_schema = schema
-    else:
-      raise TypeError('Unexpected schema argument: %s.' % schema)
-
-    self.create_disposition = BigQueryDisposition.validate_create(
-        create_disposition)
-    self.write_disposition = BigQueryDisposition.validate_write(
-        write_disposition)
-    self.validate = validate
-    self.coder = coder or RowAsDictJsonCoder()
-
-  def display_data(self):
-    res = {}
-    if self.table_reference is not None:
-      tableSpec = '{}.{}'.format(self.table_reference.datasetId,
-                                 self.table_reference.tableId)
-      if self.table_reference.projectId is not None:
-        tableSpec = '{}:{}'.format(self.table_reference.projectId,
-                                   tableSpec)
-      res['table'] = DisplayDataItem(tableSpec, label='Table')
-
-    res['validation'] = DisplayDataItem(self.validate,
-                                        label="Validation Enabled")
-    return res
-
-  def schema_as_json(self):
-    """Returns the TableSchema associated with the sink as a JSON string."""
-
-    def schema_list_as_object(schema_list):
-      """Returns a list of TableFieldSchema objects as a list of dicts."""
-      fields = []
-      for f in schema_list:
-        fs = {'name': f.name, 'type': f.type}
-        if f.description is not None:
-          fs['description'] = f.description
-        if f.mode is not None:
-          fs['mode'] = f.mode
-        if f.type.lower() == 'record':
-          fs['fields'] = schema_list_as_object(f.fields)
-        fields.append(fs)
-      return fields
-    return json.dumps(
-        {'fields': schema_list_as_object(self.table_schema.fields)})
-
-  @property
-  def format(self):
-    """Sink format name required for remote execution."""
-    return 'bigquery'
-
-  def writer(self, test_bigquery_client=None, buffer_size=None):
-    return BigQueryWriter(
-        sink=self, test_bigquery_client=test_bigquery_client,
-        buffer_size=buffer_size)
-
-
-# -----------------------------------------------------------------------------
-# BigQueryReader, BigQueryWriter.
-
-
-class BigQueryReader(dataflow_io.NativeSourceReader):
-  """A reader for a BigQuery source."""
-
-  def __init__(self, source, test_bigquery_client=None, use_legacy_sql=True,
-               flatten_results=True):
-    self.source = source
-    self.test_bigquery_client = test_bigquery_client
-    if auth.is_running_in_gce:
-      self.executing_project = auth.executing_project
-    elif hasattr(source, 'pipeline_options'):
-      self.executing_project = (
-          source.pipeline_options.view_as(GoogleCloudOptions).project)
-    else:
-      self.executing_project = None
-
-    # TODO(silviuc): Try to automatically get it from gcloud config info.
-    if not self.executing_project and test_bigquery_client is None:
-      raise RuntimeError(
-          'Missing executing project information. Please use the --project '
-          'command line option to specify it.')
-    self.row_as_dict = isinstance(self.source.coder, RowAsDictJsonCoder)
-    # Schema for the rows being read by the reader. It is initialized the
-    # first time something gets read from the table. It is not required
-    # for reading the field values in each row but could be useful for
-    # getting additional details.
-    self.schema = None
-    self.use_legacy_sql = use_legacy_sql
-    self.flatten_results = flatten_results
-
-    if self.source.query is None:
-      # If table schema did not define a project we default to executing
-      # project.
-      project_id = self.source.table_reference.projectId
-      if not project_id:
-        project_id = self.executing_project
-      self.query = 'SELECT * FROM [%s:%s.%s];' % (
-          project_id,
-          self.source.table_reference.datasetId,
-          self.source.table_reference.tableId)
-    else:
-      self.query = self.source.query
-
-  def __enter__(self):
-    self.client = BigQueryWrapper(client=self.test_bigquery_client)
-    self.client.create_temporary_dataset(self.executing_project)
-    return self
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    self.client.clean_up_temporary_dataset(self.executing_project)
-
-  def __iter__(self):
-    for rows, schema in self.client.run_query(
-        project_id=self.executing_project, query=self.query,
-        use_legacy_sql=self.use_legacy_sql,
-        flatten_results=self.flatten_results):
-      if self.schema is None:
-        self.schema = schema
-      for row in rows:
-        if self.row_as_dict:
-          yield self.client.convert_row_to_dict(row, schema)
-        else:
-          yield row
-
-
-class BigQueryWriter(dataflow_io.NativeSinkWriter):
-  """The sink writer for a BigQuerySink."""
-
-  def __init__(self, sink, test_bigquery_client=None, buffer_size=None):
-    self.sink = sink
-    self.test_bigquery_client = test_bigquery_client
-    self.row_as_dict = isinstance(self.sink.coder, RowAsDictJsonCoder)
-    # Buffer used to batch written rows so we reduce communication with the
-    # BigQuery service.
-    self.rows_buffer = []
-    self.rows_buffer_flush_threshold = buffer_size or 1000
-    # Figure out the project, dataset, and table used for the sink.
-    self.project_id = self.sink.table_reference.projectId
-
-    # If table schema did not define a project we default to executing project.
-    if self.project_id is None and hasattr(sink, 'pipeline_options'):
-      self.project_id = (
-          sink.pipeline_options.view_as(GoogleCloudOptions).project)
-
-    assert self.project_id is not None
-
-    self.dataset_id = self.sink.table_reference.datasetId
-    self.table_id = self.sink.table_reference.tableId
-
-  def _flush_rows_buffer(self):
-    if self.rows_buffer:
-      logging.info('Writing %d rows to %s:%s.%s table.', len(self.rows_buffer),
-                   self.project_id, self.dataset_id, self.table_id)
-      passed, errors = self.client.insert_rows(
-          project_id=self.project_id, dataset_id=self.dataset_id,
-          table_id=self.table_id, rows=self.rows_buffer)
-      self.rows_buffer = []
-      if not passed:
-        raise RuntimeError('Could not successfully insert rows to BigQuery'
-                           ' table [%s:%s.%s]. Errors: %s'%
-                           (self.project_id, self.dataset_id,
-                            self.table_id, errors))
-
-  def __enter__(self):
-    self.client = BigQueryWrapper(client=self.test_bigquery_client)
-    self.client.get_or_create_table(
-        self.project_id, self.dataset_id, self.table_id, 
self.sink.table_schema,
-        self.sink.create_disposition, self.sink.write_disposition)
-    return self
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    self._flush_rows_buffer()
-
-  def Write(self, row):
-    self.rows_buffer.append(row)
-    if len(self.rows_buffer) > self.rows_buffer_flush_threshold:
-      self._flush_rows_buffer()
-
-
-# -----------------------------------------------------------------------------
-# BigQueryWrapper.
-
-
-class BigQueryWrapper(object):
-  """BigQuery client wrapper with utilities for querying.
-
-  The wrapper is used to organize all the BigQuery integration points and
-  offer a common place where retry logic for failures can be controlled.
-  In addition it offers various functions used both in sources and sinks
-  (e.g., find and create tables, query a table, etc.).
-  """
-
-  TEMP_TABLE = 'temp_table_'
-  TEMP_DATASET = 'temp_dataset_'
-
-  def __init__(self, client=None):
-    self.client = client or bigquery.BigqueryV2(
-        credentials=auth.get_service_credentials())
-    self._unique_row_id = 0
-    # For testing scenarios where we pass in a client we do not want a
-    # randomized prefix for row IDs.
-    self._row_id_prefix = '' if client else uuid.uuid4()
-    self._temporary_table_suffix = uuid.uuid4().hex
-
-  @property
-  def unique_row_id(self):
-    """Returns a unique row ID (str) used to avoid multiple insertions.
-
-    If the row ID is provided, BigQuery will make a best effort to not insert
-    the same row multiple times for fail and retry scenarios in which the 
insert
-    request may be issued several times. This comes into play for sinks 
executed
-    in a local runner.
-
-    Returns:
-      a unique row ID string
-    """
-    self._unique_row_id += 1
-    return '%s_%d' % (self._row_id_prefix, self._unique_row_id)
-
-  def _get_temp_table(self, project_id):
-    return _parse_table_reference(
-        table=BigQueryWrapper.TEMP_TABLE + self._temporary_table_suffix,
-        dataset=BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix,
-        project=project_id)
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def _start_query_job(self, project_id, query, use_legacy_sql, 
flatten_results,
-                       job_id, dry_run=False):
-    reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
-    request = bigquery.BigqueryJobsInsertRequest(
-        projectId=project_id,
-        job=bigquery.Job(
-            configuration=bigquery.JobConfiguration(
-                dryRun=dry_run,
-                query=bigquery.JobConfigurationQuery(
-                    query=query,
-                    useLegacySql=use_legacy_sql,
-                    allowLargeResults=True,
-                    destinationTable=self._get_temp_table(project_id),
-                    flattenResults=flatten_results)),
-            jobReference=reference))
-
-    response = self.client.jobs.Insert(request)
-    return response.jobReference.jobId
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def _get_query_results(self, project_id, job_id,
-                         page_token=None, max_results=10000):
-    request = bigquery.BigqueryJobsGetQueryResultsRequest(
-        jobId=job_id, pageToken=page_token, projectId=project_id,
-        maxResults=max_results)
-    response = self.client.jobs.GetQueryResults(request)
-    return response
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_filter)
-  def _insert_all_rows(self, project_id, dataset_id, table_id, rows):
-    # The rows argument is a list of
-    # bigquery.TableDataInsertAllRequest.RowsValueListEntry instances as
-    # required by the InsertAll() method.
-    request = bigquery.BigqueryTabledataInsertAllRequest(
-        projectId=project_id, datasetId=dataset_id, tableId=table_id,
-        tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest(
-            # TODO(silviuc): Should have an option for skipInvalidRows?
-            # TODO(silviuc): Should have an option for ignoreUnknownValues?
-            rows=rows))
-    response = self.client.tabledata.InsertAll(request)
-    # response.insertErrors is not [] if errors encountered.
-    return not response.insertErrors, response.insertErrors
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def _get_table(self, project_id, dataset_id, table_id):
-    request = bigquery.BigqueryTablesGetRequest(
-        projectId=project_id, datasetId=dataset_id, tableId=table_id)
-    response = self.client.tables.Get(request)
-    # The response is a bigquery.Table instance.
-    return response
-
-  def _create_table(self, project_id, dataset_id, table_id, schema):
-    table = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId=project_id, datasetId=dataset_id, tableId=table_id),
-        schema=schema)
-    request = bigquery.BigqueryTablesInsertRequest(
-        projectId=project_id, datasetId=dataset_id, table=table)
-    response = self.client.tables.Insert(request)
-    # The response is a bigquery.Table instance.
-    return response
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def get_or_create_dataset(self, project_id, dataset_id):
-    # Check if dataset already exists otherwise create it
-    try:
-      dataset = self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest(
-          projectId=project_id, datasetId=dataset_id))
-      return dataset
-    except HttpError as exn:
-      if exn.status_code == 404:
-        dataset = bigquery.Dataset(
-            datasetReference=bigquery.DatasetReference(
-                projectId=project_id, datasetId=dataset_id))
-        request = bigquery.BigqueryDatasetsInsertRequest(
-            projectId=project_id, dataset=dataset)
-        response = self.client.datasets.Insert(request)
-        # The response is a bigquery.Dataset instance.
-        return response
-      else:
-        raise
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def _is_table_empty(self, project_id, dataset_id, table_id):
-    request = bigquery.BigqueryTabledataListRequest(
-        projectId=project_id, datasetId=dataset_id, tableId=table_id,
-        maxResults=1)
-    response = self.client.tabledata.List(request)
-    # The response is a bigquery.TableDataList instance.
-    return response.totalRows == 0
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def _delete_table(self, project_id, dataset_id, table_id):
-    request = bigquery.BigqueryTablesDeleteRequest(
-        projectId=project_id, datasetId=dataset_id, tableId=table_id)
-    try:
-      self.client.tables.Delete(request)
-    except HttpError as exn:
-      if exn.status_code == 404:
-        logging.warning('Table %s:%s.%s does not exist', project_id,
-                        dataset_id, table_id)
-        return
-      else:
-        raise
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def _delete_dataset(self, project_id, dataset_id, delete_contents=True):
-    request = bigquery.BigqueryDatasetsDeleteRequest(
-        projectId=project_id, datasetId=dataset_id,
-        deleteContents=delete_contents)
-    try:
-      self.client.datasets.Delete(request)
-    except HttpError as exn:
-      if exn.status_code == 404:
-        logging.warning('Dataaset %s:%s does not exist', project_id,
-                        dataset_id)
-        return
-      else:
-        raise
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def create_temporary_dataset(self, project_id):
-    dataset_id = BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix
-    # Check if dataset exists to make sure that the temporary id is unique
-    try:
-      self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest(
-          projectId=project_id, datasetId=dataset_id))
-      if project_id is not None:
-        # Unittests don't pass projectIds so they can be run without error
-        raise RuntimeError(
-            'Dataset %s:%s already exists so cannot be used as temporary.'
-            % (project_id, dataset_id))
-    except HttpError as exn:
-      if exn.status_code == 404:
-        logging.warning('Dataset does not exist so we will create it')
-        self.get_or_create_dataset(project_id, dataset_id)
-      else:
-        raise
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def clean_up_temporary_dataset(self, project_id):
-    temp_table = self._get_temp_table(project_id)
-    try:
-      self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest(
-          projectId=project_id, datasetId=temp_table.datasetId))
-    except HttpError as exn:
-      if exn.status_code == 404:
-        logging.warning('Dataset %s:%s does not exist', project_id,
-                        temp_table.datasetId)
-        return
-      else:
-        raise
-    self._delete_dataset(temp_table.projectId, temp_table.datasetId, True)
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def get_or_create_table(
-      self, project_id, dataset_id, table_id, schema,
-      create_disposition, write_disposition):
-    """Gets or creates a table based on create and write dispositions.
-
-    The function mimics the behavior of BigQuery import jobs when using the
-    same create and write dispositions.
-
-    Args:
-      project_id: The project id owning the table.
-      dataset_id: The dataset id owning the table.
-      table_id: The table id.
-      schema: A bigquery.TableSchema instance or None.
-      create_disposition: CREATE_NEVER or CREATE_IF_NEEDED.
-      write_disposition: WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE.
-
-    Returns:
-      A bigquery.Table instance if table was found or created.
-
-    Raises:
-      RuntimeError: For various mismatches between the state of the table and
-        the create/write dispositions passed in. For example if the table is 
not
-        empty and WRITE_EMPTY was specified then an error will be raised since
-        the table was expected to be empty.
-    """
-    found_table = None
-    try:
-      found_table = self._get_table(project_id, dataset_id, table_id)
-    except HttpError as exn:
-      if exn.status_code == 404:
-        if create_disposition == BigQueryDisposition.CREATE_NEVER:
-          raise RuntimeError(
-              'Table %s:%s.%s not found but create disposition is 
CREATE_NEVER.'
-              % (project_id, dataset_id, table_id))
-      else:
-        raise
-
-    # If table exists already then handle the semantics for WRITE_EMPTY and
-    # WRITE_TRUNCATE write dispositions.
-    if found_table:
-      table_empty = self._is_table_empty(project_id, dataset_id, table_id)
-      if (not table_empty and
-          write_disposition == BigQueryDisposition.WRITE_EMPTY):
-        raise RuntimeError(
-            'Table %s:%s.%s is not empty but write disposition is WRITE_EMPTY.'
-            % (project_id, dataset_id, table_id))
-      # Delete the table and recreate it (later) if WRITE_TRUNCATE was
-      # specified.
-      if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
-        self._delete_table(project_id, dataset_id, table_id)
-
-    # Create a new table potentially reusing the schema from a previously
-    # found table in case the schema was not specified.
-    if schema is None and found_table is None:
-      raise RuntimeError(
-          'Table %s:%s.%s requires a schema. None can be inferred because the '
-          'table does not exist.'
-          % (project_id, dataset_id, table_id))
-    if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
-      return found_table
-    else:
-      # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
-      # the table before this point.
-      return self._create_table(project_id=project_id,
-                                dataset_id=dataset_id,
-                                table_id=table_id,
-                                schema=schema or found_table.schema)
-
-  def run_query(self, project_id, query, use_legacy_sql, flatten_results,
-                dry_run=False):
-    job_id = self._start_query_job(project_id, query, use_legacy_sql,
-                                   flatten_results, job_id=uuid.uuid4().hex,
-                                   dry_run=dry_run)
-    if dry_run:
-      # If this was a dry run then the fact that we get here means the
-      # query has no errors. The start_query_job would raise an error 
otherwise.
-      return
-    page_token = None
-    while True:
-      response = self._get_query_results(project_id, job_id, page_token)
-      if not response.jobComplete:
-        # The jobComplete field can be False if the query request times out
-        # (default is 10 seconds). Note that this is a timeout for the query
-        # request not for the actual execution of the query in the service.  If
-        # the request times out we keep trying. This situation is quite 
possible
-        # if the query will return a large number of rows.
-        logging.info('Waiting on response from query: %s ...', query)
-        time.sleep(1.0)
-        continue
-      # We got some results. The last page is signalled by a missing pageToken.
-      yield response.rows, response.schema
-      if not response.pageToken:
-        break
-      page_token = response.pageToken
-
-  def insert_rows(self, project_id, dataset_id, table_id, rows):
-    """Inserts rows into the specified table.
-
-    Args:
-      project_id: The project id owning the table.
-      dataset_id: The dataset id owning the table.
-      table_id: The table id.
-      rows: A list of plain Python dictionaries. Each dictionary is a row and
-        each key in it is the name of a field.
-
-    Returns:
-      A tuple (bool, errors). If first element is False then the second element
-      will be a bigquery.InserttErrorsValueListEntry instance containing
-      specific errors.
-    """
-
-    # Prepare rows for insertion. Of special note is the row ID that we add to
-    # each row in order to help BigQuery avoid inserting a row multiple times.
-    # BigQuery will do a best-effort if unique IDs are provided. This situation
-    # can happen during retries on failures.
-    # TODO(silviuc): Must add support to writing TableRow's instead of dicts.
-    final_rows = []
-    for row in rows:
-      json_object = bigquery.JsonObject()
-      for k, v in row.iteritems():
-        json_object.additionalProperties.append(
-            bigquery.JsonObject.AdditionalProperty(
-                key=k, value=to_json_value(v)))
-      final_rows.append(
-          bigquery.TableDataInsertAllRequest.RowsValueListEntry(
-              insertId=str(self.unique_row_id),
-              json=json_object))
-    result, errors = self._insert_all_rows(
-        project_id, dataset_id, table_id, final_rows)
-    return result, errors
-
-  def _convert_cell_value_to_dict(self, value, field):
-    if field.type == 'STRING':
-      # Input: "XYZ" --> Output: "XYZ"
-      return value
-    elif field.type == 'BOOLEAN':
-      # Input: "true" --> Output: True
-      return value == 'true'
-    elif field.type == 'INTEGER':
-      # Input: "123" --> Output: 123
-      return int(value)
-    elif field.type == 'FLOAT':
-      # Input: "1.23" --> Output: 1.23
-      return float(value)
-    elif field.type == 'TIMESTAMP':
-      # The UTC should come from the timezone library but this is a known
-      # issue in python 2.7 so we'll just hardcode it as we're reading using
-      # utcfromtimestamp.
-      # Input: 1478134176.985864 --> Output: "2016-11-03 00:49:36.985864 UTC"
-      dt = datetime.datetime.utcfromtimestamp(float(value))
-      return dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC')
-    elif field.type == 'BYTES':
-      # Input: "YmJi" --> Output: "YmJi"
-      return value
-    elif field.type == 'DATE':
-      # Input: "2016-11-03" --> Output: "2016-11-03"
-      return value
-    elif field.type == 'DATETIME':
-      # Input: "2016-11-03T00:49:36" --> Output: "2016-11-03T00:49:36"
-      return value
-    elif field.type == 'TIME':
-      # Input: "00:49:36" --> Output: "00:49:36"
-      return value
-    elif field.type == 'RECORD':
-      # Note that a schema field object supports also a RECORD type. However
-      # when querying, the repeated and/or record fields are flattened
-      # unless we pass the flatten_results flag as False to the source
-      return self.convert_row_to_dict(value, field)
-    else:
-      raise RuntimeError('Unexpected field type: %s' % field.type)
-
-  def convert_row_to_dict(self, row, schema):
-    """Converts a TableRow instance using the schema to a Python dict."""
-    result = {}
-    for index, field in enumerate(schema.fields):
-      value = None
-      if isinstance(schema, bigquery.TableSchema):
-        cell = row.f[index]
-        value = from_json_value(cell.v) if cell.v is not None else None
-      elif isinstance(schema, bigquery.TableFieldSchema):
-        cell = row['f'][index]
-        value = cell['v'] if 'v' in cell else None
-      if field.mode == 'REPEATED':
-        result[field.name] = [self._convert_cell_value_to_dict(x['v'], field)
-                              for x in value]
-      elif value is None:
-        if not field.mode == 'NULLABLE':
-          raise ValueError('Received \'None\' as the value for the field %s '
-                           'but the field is not NULLABLE.', field.name)
-        result[field.name] = None
-      else:
-        result[field.name] = self._convert_cell_value_to_dict(value, field)
-    return result

http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/io/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery_test.py 
b/sdks/python/apache_beam/io/bigquery_test.py
deleted file mode 100644
index 1299a31..0000000
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ /dev/null
@@ -1,812 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for BigQuery sources and sinks."""
-
-import datetime
-import json
-import logging
-import time
-import unittest
-
-import hamcrest as hc
-import mock
-from apitools.base.py.exceptions import HttpError
-
-import apache_beam as beam
-from apache_beam.internal.json_value import to_json_value
-from apache_beam.io.bigquery import RowAsDictJsonCoder
-from apache_beam.io.bigquery import TableRowJsonCoder
-from apache_beam.io.bigquery import parse_table_schema_from_json
-from apache_beam.io.google_cloud_platform.internal.clients import bigquery
-from apache_beam.transforms.display import DisplayData
-from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils.pipeline_options import PipelineOptions
-
-
-class TestRowAsDictJsonCoder(unittest.TestCase):
-
-  def test_row_as_dict(self):
-    coder = RowAsDictJsonCoder()
-    test_value = {'s': 'abc', 'i': 123, 'f': 123.456, 'b': True}
-    self.assertEqual(test_value, coder.decode(coder.encode(test_value)))
-
-  def json_compliance_exception(self, value):
-    with self.assertRaises(ValueError) as exn:
-      coder = RowAsDictJsonCoder()
-      test_value = {'s': value}
-      self.assertEqual(test_value, coder.decode(coder.encode(test_value)))
-      self.assertTrue(bigquery.JSON_COMPLIANCE_ERROR in exn.exception.message)
-
-  def test_invalid_json_nan(self):
-    self.json_compliance_exception(float('nan'))
-
-  def test_invalid_json_inf(self):
-    self.json_compliance_exception(float('inf'))
-
-  def test_invalid_json_neg_inf(self):
-    self.json_compliance_exception(float('-inf'))
-
-
-class TestTableRowJsonCoder(unittest.TestCase):
-
-  def test_row_as_table_row(self):
-    schema_definition = [
-        ('s', 'STRING'),
-        ('i', 'INTEGER'),
-        ('f', 'FLOAT'),
-        ('b', 'BOOLEAN'),
-        ('r', 'RECORD')]
-    data_defination = [
-        'abc',
-        123,
-        123.456,
-        True,
-        {'a': 'b'}]
-    str_def = '{"s": "abc", "i": 123, "f": 123.456, "b": true, "r": {"a": 
"b"}}'
-    schema = bigquery.TableSchema(
-        fields=[bigquery.TableFieldSchema(name=k, type=v)
-                for k, v in schema_definition])
-    coder = TableRowJsonCoder(table_schema=schema)
-    test_row = bigquery.TableRow(
-        f=[bigquery.TableCell(v=to_json_value(e)) for e in data_defination])
-
-    self.assertEqual(str_def, coder.encode(test_row))
-    self.assertEqual(test_row, coder.decode(coder.encode(test_row)))
-    # A coder without schema can still decode.
-    self.assertEqual(
-        test_row, TableRowJsonCoder().decode(coder.encode(test_row)))
-
-  def test_row_and_no_schema(self):
-    coder = TableRowJsonCoder()
-    test_row = bigquery.TableRow(
-        f=[bigquery.TableCell(v=to_json_value(e))
-           for e in ['abc', 123, 123.456, True]])
-    with self.assertRaises(AttributeError) as ctx:
-      coder.encode(test_row)
-    self.assertTrue(
-        ctx.exception.message.startswith('The TableRowJsonCoder requires'))
-
-  def json_compliance_exception(self, value):
-    with self.assertRaises(ValueError) as exn:
-      schema_definition = [('f', 'FLOAT')]
-      schema = bigquery.TableSchema(
-          fields=[bigquery.TableFieldSchema(name=k, type=v)
-                  for k, v in schema_definition])
-      coder = TableRowJsonCoder(table_schema=schema)
-      test_row = bigquery.TableRow(
-          f=[bigquery.TableCell(v=to_json_value(value))])
-      coder.encode(test_row)
-      self.assertTrue(bigquery.JSON_COMPLIANCE_ERROR in exn.exception.message)
-
-  def test_invalid_json_nan(self):
-    self.json_compliance_exception(float('nan'))
-
-  def test_invalid_json_inf(self):
-    self.json_compliance_exception(float('inf'))
-
-  def test_invalid_json_neg_inf(self):
-    self.json_compliance_exception(float('-inf'))
-
-
-class TestTableSchemaParser(unittest.TestCase):
-  def test_parse_table_schema_from_json(self):
-    string_field = bigquery.TableFieldSchema(
-        name='s', type='STRING', mode='NULLABLE', description='s description')
-    number_field = bigquery.TableFieldSchema(
-        name='n', type='INTEGER', mode='REQUIRED', description='n description')
-    record_field = bigquery.TableFieldSchema(
-        name='r', type='RECORD', mode='REQUIRED', description='r description',
-        fields=[string_field, number_field])
-    expected_schema = bigquery.TableSchema(fields=[record_field])
-    json_str = json.dumps({'fields': [
-        {'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED',
-         'description': 'r description', 'fields': [
-             {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE',
-              'description': 's description'},
-             {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED',
-              'description': 'n description'}]}]})
-    self.assertEqual(parse_table_schema_from_json(json_str),
-                     expected_schema)
-
-
-class TestBigQuerySource(unittest.TestCase):
-
-  def test_display_data_item_on_validate_true(self):
-    source = beam.io.BigQuerySource('dataset.table', validate=True)
-
-    dd = DisplayData.create_from(source)
-    expected_items = [
-        DisplayDataItemMatcher('validation', True),
-        DisplayDataItemMatcher('table', 'dataset.table')]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-  def test_table_reference_display_data(self):
-    source = beam.io.BigQuerySource('dataset.table')
-    dd = DisplayData.create_from(source)
-    expected_items = [
-        DisplayDataItemMatcher('validation', False),
-        DisplayDataItemMatcher('table', 'dataset.table')]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-    source = beam.io.BigQuerySource('project:dataset.table')
-    dd = DisplayData.create_from(source)
-    expected_items = [
-        DisplayDataItemMatcher('validation', False),
-        DisplayDataItemMatcher('table', 'project:dataset.table')]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-    source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
-    dd = DisplayData.create_from(source)
-    expected_items = [
-        DisplayDataItemMatcher('validation',
-                               False),
-        DisplayDataItemMatcher('table',
-                               'xyz.com:project:dataset.table')]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-  def test_parse_table_reference(self):
-    source = beam.io.BigQuerySource('dataset.table')
-    self.assertEqual(source.table_reference.datasetId, 'dataset')
-    self.assertEqual(source.table_reference.tableId, 'table')
-
-    source = beam.io.BigQuerySource('project:dataset.table')
-    self.assertEqual(source.table_reference.projectId, 'project')
-    self.assertEqual(source.table_reference.datasetId, 'dataset')
-    self.assertEqual(source.table_reference.tableId, 'table')
-
-    source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
-    self.assertEqual(source.table_reference.projectId, 'xyz.com:project')
-    self.assertEqual(source.table_reference.datasetId, 'dataset')
-    self.assertEqual(source.table_reference.tableId, 'table')
-
-    source = beam.io.BigQuerySource(query='my_query')
-    self.assertEqual(source.query, 'my_query')
-    self.assertIsNone(source.table_reference)
-    self.assertTrue(source.use_legacy_sql)
-
-  def test_query_only_display_data(self):
-    source = beam.io.BigQuerySource(query='my_query')
-    dd = DisplayData.create_from(source)
-    expected_items = [
-        DisplayDataItemMatcher('validation', False),
-        DisplayDataItemMatcher('query', 'my_query')]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-  def test_specify_query_sql_format(self):
-    source = beam.io.BigQuerySource(query='my_query', use_standard_sql=True)
-    self.assertEqual(source.query, 'my_query')
-    self.assertFalse(source.use_legacy_sql)
-
-  def test_specify_query_flattened_records(self):
-    source = beam.io.BigQuerySource(query='my_query', flatten_results=False)
-    self.assertFalse(source.flatten_results)
-
-  def test_specify_query_unflattened_records(self):
-    source = beam.io.BigQuerySource(query='my_query', flatten_results=True)
-    self.assertTrue(source.flatten_results)
-
-  def test_specify_query_without_table(self):
-    source = beam.io.BigQuerySource(query='my_query')
-    self.assertEqual(source.query, 'my_query')
-    self.assertIsNone(source.table_reference)
-
-  def test_date_partitioned_table_name(self):
-    source = beam.io.BigQuerySource('dataset.table$20030102', validate=True)
-    dd = DisplayData.create_from(source)
-    expected_items = [
-        DisplayDataItemMatcher('validation', True),
-        DisplayDataItemMatcher('table', 'dataset.table$20030102')]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-
-class TestBigQuerySink(unittest.TestCase):
-
-  def test_table_spec_display_data(self):
-    sink = beam.io.BigQuerySink('dataset.table')
-    dd = DisplayData.create_from(sink)
-    expected_items = [
-        DisplayDataItemMatcher('table', 'dataset.table'),
-        DisplayDataItemMatcher('validation', False)]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-  def test_parse_schema_descriptor(self):
-    sink = beam.io.BigQuerySink(
-        'dataset.table', schema='s:STRING, n:INTEGER')
-    self.assertEqual(sink.table_reference.datasetId, 'dataset')
-    self.assertEqual(sink.table_reference.tableId, 'table')
-    result_schema = {
-        field.name: field.type for field in sink.table_schema.fields}
-    self.assertEqual({'n': 'INTEGER', 's': 'STRING'}, result_schema)
-
-  def test_project_table_display_data(self):
-    sinkq = beam.io.BigQuerySink('PROJECT:dataset.table')
-    dd = DisplayData.create_from(sinkq)
-    expected_items = [
-        DisplayDataItemMatcher('table', 'PROJECT:dataset.table'),
-        DisplayDataItemMatcher('validation', False)]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-  def test_simple_schema_as_json(self):
-    sink = beam.io.BigQuerySink(
-        'PROJECT:dataset.table', schema='s:STRING, n:INTEGER')
-    self.assertEqual(
-        json.dumps({'fields': [
-            {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
-            {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}),
-        sink.schema_as_json())
-
-  def test_nested_schema_as_json(self):
-    string_field = bigquery.TableFieldSchema(
-        name='s', type='STRING', mode='NULLABLE', description='s description')
-    number_field = bigquery.TableFieldSchema(
-        name='n', type='INTEGER', mode='REQUIRED', description='n description')
-    record_field = bigquery.TableFieldSchema(
-        name='r', type='RECORD', mode='REQUIRED', description='r description',
-        fields=[string_field, number_field])
-    schema = bigquery.TableSchema(fields=[record_field])
-    sink = beam.io.BigQuerySink('dataset.table', schema=schema)
-    self.assertEqual(
-        {'fields': [
-            {'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED',
-             'description': 'r description', 'fields': [
-                 {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE',
-                  'description': 's description'},
-                 {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED',
-                  'description': 'n description'}]}]},
-        json.loads(sink.schema_as_json()))
-
-
-class TestBigQueryReader(unittest.TestCase):
-
-  def get_test_rows(self):
-    now = time.time()
-    dt = datetime.datetime.utcfromtimestamp(float(now))
-    ts = dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC')
-    expected_rows = [
-        {
-            'i': 1,
-            's': 'abc',
-            'f': 2.3,
-            'b': True,
-            't': ts,
-            'dt': '2016-10-31',
-            'ts': '22:39:12.627498',
-            'dt_ts': '2008-12-25T07:30:00',
-            'r': {'s2': 'b'},
-            'rpr': [{'s3': 'c', 'rpr2': [{'rs': ['d', 'e'], 's4': None}]}]
-        },
-        {
-            'i': 10,
-            's': 'xyz',
-            'f': -3.14,
-            'b': False,
-            'rpr': [],
-            't': None,
-            'dt': None,
-            'ts': None,
-            'dt_ts': None,
-            'r': None,
-        }]
-
-    nested_schema = [
-        bigquery.TableFieldSchema(
-            name='s2', type='STRING', mode='NULLABLE')]
-    nested_schema_2 = [
-        bigquery.TableFieldSchema(
-            name='s3', type='STRING', mode='NULLABLE'),
-        bigquery.TableFieldSchema(
-            name='rpr2', type='RECORD', mode='REPEATED', fields=[
-                bigquery.TableFieldSchema(
-                    name='rs', type='STRING', mode='REPEATED'),
-                bigquery.TableFieldSchema(
-                    name='s4', type='STRING', mode='NULLABLE')])]
-
-    schema = bigquery.TableSchema(
-        fields=[
-            bigquery.TableFieldSchema(
-                name='b', type='BOOLEAN', mode='REQUIRED'),
-            bigquery.TableFieldSchema(
-                name='f', type='FLOAT', mode='REQUIRED'),
-            bigquery.TableFieldSchema(
-                name='i', type='INTEGER', mode='REQUIRED'),
-            bigquery.TableFieldSchema(
-                name='s', type='STRING', mode='REQUIRED'),
-            bigquery.TableFieldSchema(
-                name='t', type='TIMESTAMP', mode='NULLABLE'),
-            bigquery.TableFieldSchema(
-                name='dt', type='DATE', mode='NULLABLE'),
-            bigquery.TableFieldSchema(
-                name='ts', type='TIME', mode='NULLABLE'),
-            bigquery.TableFieldSchema(
-                name='dt_ts', type='DATETIME', mode='NULLABLE'),
-            bigquery.TableFieldSchema(
-                name='r', type='RECORD', mode='NULLABLE',
-                fields=nested_schema),
-            bigquery.TableFieldSchema(
-                name='rpr', type='RECORD', mode='REPEATED',
-                fields=nested_schema_2)])
-
-    table_rows = [
-        bigquery.TableRow(f=[
-            bigquery.TableCell(v=to_json_value('true')),
-            bigquery.TableCell(v=to_json_value(str(2.3))),
-            bigquery.TableCell(v=to_json_value(str(1))),
-            bigquery.TableCell(v=to_json_value('abc')),
-            # For timestamps cannot use str() because it will truncate the
-            # number representing the timestamp.
-            bigquery.TableCell(v=to_json_value('%f' % now)),
-            bigquery.TableCell(v=to_json_value('2016-10-31')),
-            bigquery.TableCell(v=to_json_value('22:39:12.627498')),
-            bigquery.TableCell(v=to_json_value('2008-12-25T07:30:00')),
-            # For record we cannot use dict because it doesn't create nested
-            # schemas correctly so we have to use this f,v based format
-            bigquery.TableCell(v=to_json_value({'f': [{'v': 'b'}]})),
-            bigquery.TableCell(v=to_json_value([{'v':{'f':[{'v': 'c'}, {'v':[
-                {'v':{'f':[{'v':[{'v':'d'}, {'v':'e'}]}, {'v':None}]}}]}]}}]))
-            ]),
-        bigquery.TableRow(f=[
-            bigquery.TableCell(v=to_json_value('false')),
-            bigquery.TableCell(v=to_json_value(str(-3.14))),
-            bigquery.TableCell(v=to_json_value(str(10))),
-            bigquery.TableCell(v=to_json_value('xyz')),
-            bigquery.TableCell(v=None),
-            bigquery.TableCell(v=None),
-            bigquery.TableCell(v=None),
-            bigquery.TableCell(v=None),
-            bigquery.TableCell(v=None),
-            bigquery.TableCell(v=to_json_value([]))])]
-    return table_rows, schema, expected_rows
-
-  def test_read_from_table(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    client.jobs.GetQueryResults.return_value = 
bigquery.GetQueryResultsResponse(
-        jobComplete=True, rows=table_rows, schema=schema)
-    actual_rows = []
-    with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, expected_rows)
-    self.assertEqual(schema, reader.schema)
-
-  def test_read_from_query(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    client.jobs.GetQueryResults.return_value = 
bigquery.GetQueryResultsResponse(
-        jobComplete=True, rows=table_rows, schema=schema)
-    actual_rows = []
-    with beam.io.BigQuerySource(query='query').reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, expected_rows)
-    self.assertEqual(schema, reader.schema)
-    self.assertTrue(reader.use_legacy_sql)
-    self.assertTrue(reader.flatten_results)
-
-  def test_read_from_query_sql_format(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    client.jobs.GetQueryResults.return_value = 
bigquery.GetQueryResultsResponse(
-        jobComplete=True, rows=table_rows, schema=schema)
-    actual_rows = []
-    with beam.io.BigQuerySource(
-        query='query', use_standard_sql=True).reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, expected_rows)
-    self.assertEqual(schema, reader.schema)
-    self.assertFalse(reader.use_legacy_sql)
-    self.assertTrue(reader.flatten_results)
-
-  def test_read_from_query_unflatten_records(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    client.jobs.GetQueryResults.return_value = 
bigquery.GetQueryResultsResponse(
-        jobComplete=True, rows=table_rows, schema=schema)
-    actual_rows = []
-    with beam.io.BigQuerySource(
-        query='query', flatten_results=False).reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, expected_rows)
-    self.assertEqual(schema, reader.schema)
-    self.assertTrue(reader.use_legacy_sql)
-    self.assertFalse(reader.flatten_results)
-
-  def test_using_both_query_and_table_fails(self):
-    with self.assertRaises(ValueError) as exn:
-      beam.io.BigQuerySource(table='dataset.table', query='query')
-      self.assertEqual(exn.exception.message, 'Both a BigQuery table and a'
-                       ' query were specified. Please specify only one of '
-                       'these.')
-
-  def test_using_neither_query_nor_table_fails(self):
-    with self.assertRaises(ValueError) as exn:
-      beam.io.BigQuerySource()
-      self.assertEqual(exn.exception.message, 'A BigQuery table or a query'
-                       ' must be specified')
-
-  def test_read_from_table_as_tablerows(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, _ = self.get_test_rows()
-    client.jobs.GetQueryResults.return_value = 
bigquery.GetQueryResultsResponse(
-        jobComplete=True, rows=table_rows, schema=schema)
-    actual_rows = []
-    # We set the coder to TableRowJsonCoder, which is a signal that
-    # the caller wants to see the rows as TableRows.
-    with beam.io.BigQuerySource(
-        'dataset.table', coder=TableRowJsonCoder).reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, table_rows)
-    self.assertEqual(schema, reader.schema)
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_read_from_table_and_job_complete_retry(self, patched_time_sleep):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    # Return jobComplete=False on first call to trigger the code path where
-    # query needs to handle waiting a bit.
-    client.jobs.GetQueryResults.side_effect = [
-        bigquery.GetQueryResultsResponse(
-            jobComplete=False),
-        bigquery.GetQueryResultsResponse(
-            jobComplete=True, rows=table_rows, schema=schema)]
-    actual_rows = []
-    with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, expected_rows)
-
-  def test_read_from_table_and_multiple_pages(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    # Return a pageToken on first call to trigger the code path where
-    # query needs to handle multiple pages of results.
-    client.jobs.GetQueryResults.side_effect = [
-        bigquery.GetQueryResultsResponse(
-            jobComplete=True, rows=table_rows, schema=schema,
-            pageToken='token'),
-        bigquery.GetQueryResultsResponse(
-            jobComplete=True, rows=table_rows, schema=schema)]
-    actual_rows = []
-    with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    # We return expected rows for each of the two pages of results so we
-    # adjust our expectation below accordingly.
-    self.assertEqual(actual_rows, expected_rows * 2)
-
-  def test_table_schema_without_project(self):
-    # Reader should pick executing project by default.
-    source = beam.io.BigQuerySource(table='mydataset.mytable')
-    options = PipelineOptions(flags=['--project', 'myproject'])
-    source.pipeline_options = options
-    reader = source.reader()
-    self.assertEquals('SELECT * FROM [myproject:mydataset.mytable];',
-                      reader.query)
-
-
-class TestBigQueryWriter(unittest.TestCase):
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_no_table_and_create_never(self, patched_time_sleep):
-    client = mock.Mock()
-    client.tables.Get.side_effect = HttpError(
-        response={'status': '404'}, url='', content='')
-    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
-    with self.assertRaises(RuntimeError) as exn:
-      with beam.io.BigQuerySink(
-          'project:dataset.table',
-          create_disposition=create_disposition).writer(client):
-        pass
-    self.assertEqual(
-        exn.exception.message,
-        'Table project:dataset.table not found but create disposition is '
-        'CREATE_NEVER.')
-
-  def test_no_table_and_create_if_needed(self):
-    client = mock.Mock()
-    table = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId='project', datasetId='dataset', tableId='table'),
-        schema=bigquery.TableSchema())
-    client.tables.Get.side_effect = HttpError(
-        response={'status': '404'}, url='', content='')
-    client.tables.Insert.return_value = table
-    create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
-    with beam.io.BigQuerySink(
-        'project:dataset.table',
-        schema='somefield:INTEGER',
-        create_disposition=create_disposition).writer(client):
-      pass
-    self.assertTrue(client.tables.Get.called)
-    self.assertTrue(client.tables.Insert.called)
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_no_table_and_create_if_needed_and_no_schema(
-      self, patched_time_sleep):
-    client = mock.Mock()
-    client.tables.Get.side_effect = HttpError(
-        response={'status': '404'}, url='', content='')
-    create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
-    with self.assertRaises(RuntimeError) as exn:
-      with beam.io.BigQuerySink(
-          'project:dataset.table',
-          create_disposition=create_disposition).writer(client):
-        pass
-    self.assertEqual(
-        exn.exception.message,
-        'Table project:dataset.table requires a schema. None can be inferred '
-        'because the table does not exist.')
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_table_not_empty_and_write_disposition_empty(
-      self, patched_time_sleep):
-    client = mock.Mock()
-    client.tables.Get.return_value = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId='project', datasetId='dataset', tableId='table'),
-        schema=bigquery.TableSchema())
-    client.tabledata.List.return_value = bigquery.TableDataList(totalRows=1)
-    write_disposition = beam.io.BigQueryDisposition.WRITE_EMPTY
-    with self.assertRaises(RuntimeError) as exn:
-      with beam.io.BigQuerySink(
-          'project:dataset.table',
-          write_disposition=write_disposition).writer(client):
-        pass
-    self.assertEqual(
-        exn.exception.message,
-        'Table project:dataset.table is not empty but write disposition is '
-        'WRITE_EMPTY.')
-
-  def test_table_empty_and_write_disposition_empty(self):
-    client = mock.Mock()
-    table = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId='project', datasetId='dataset', tableId='table'),
-        schema=bigquery.TableSchema())
-    client.tables.Get.return_value = table
-    client.tabledata.List.return_value = bigquery.TableDataList(totalRows=0)
-    client.tables.Insert.return_value = table
-    write_disposition = beam.io.BigQueryDisposition.WRITE_EMPTY
-    with beam.io.BigQuerySink(
-        'project:dataset.table',
-        write_disposition=write_disposition).writer(client):
-      pass
-    self.assertTrue(client.tables.Get.called)
-    self.assertTrue(client.tabledata.List.called)
-    self.assertFalse(client.tables.Delete.called)
-    self.assertFalse(client.tables.Insert.called)
-
-  def test_table_with_write_disposition_truncate(self):
-    client = mock.Mock()
-    table = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId='project', datasetId='dataset', tableId='table'),
-        schema=bigquery.TableSchema())
-    client.tables.Get.return_value = table
-    client.tables.Insert.return_value = table
-    write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE
-    with beam.io.BigQuerySink(
-        'project:dataset.table',
-        write_disposition=write_disposition).writer(client):
-      pass
-    self.assertTrue(client.tables.Get.called)
-    self.assertTrue(client.tables.Delete.called)
-    self.assertTrue(client.tables.Insert.called)
-
-  def test_table_with_write_disposition_append(self):
-    client = mock.Mock()
-    table = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId='project', datasetId='dataset', tableId='table'),
-        schema=bigquery.TableSchema())
-    client.tables.Get.return_value = table
-    client.tables.Insert.return_value = table
-    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
-    with beam.io.BigQuerySink(
-        'project:dataset.table',
-        write_disposition=write_disposition).writer(client):
-      pass
-    self.assertTrue(client.tables.Get.called)
-    self.assertFalse(client.tables.Delete.called)
-    self.assertFalse(client.tables.Insert.called)
-
-  def test_rows_are_written(self):
-    client = mock.Mock()
-    table = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId='project', datasetId='dataset', tableId='table'),
-        schema=bigquery.TableSchema())
-    client.tables.Get.return_value = table
-    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
-
-    insert_response = mock.Mock()
-    insert_response.insertErrors = []
-    client.tabledata.InsertAll.return_value = insert_response
-
-    with beam.io.BigQuerySink(
-        'project:dataset.table',
-        write_disposition=write_disposition).writer(client) as writer:
-      writer.Write({'i': 1, 'b': True, 's': 'abc', 'f': 3.14})
-
-    sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14}
-    expected_rows = []
-    json_object = bigquery.JsonObject()
-    for k, v in sample_row.iteritems():
-      json_object.additionalProperties.append(
-          bigquery.JsonObject.AdditionalProperty(
-              key=k, value=to_json_value(v)))
-    expected_rows.append(
-        bigquery.TableDataInsertAllRequest.RowsValueListEntry(
-            insertId='_1',  # First row ID generated with prefix ''
-            json=json_object))
-    client.tabledata.InsertAll.assert_called_with(
-        bigquery.BigqueryTabledataInsertAllRequest(
-            projectId='project', datasetId='dataset', tableId='table',
-            tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest(
-                rows=expected_rows)))
-
-  def test_table_schema_without_project(self):
-    # Writer should pick executing project by default.
-    sink = beam.io.BigQuerySink(table='mydataset.mytable')
-    options = PipelineOptions(flags=['--project', 'myproject'])
-    sink.pipeline_options = options
-    writer = sink.writer()
-    self.assertEquals('myproject', writer.project_id)
-
-
-class TestBigQueryWrapper(unittest.TestCase):
-
-  def test_delete_non_existing_dataset(self):
-    client = mock.Mock()
-    client.datasets.Delete.side_effect = HttpError(
-        response={'status': '404'}, url='', content='')
-    wrapper = beam.io.bigquery.BigQueryWrapper(client)
-    wrapper._delete_dataset('', '')
-    self.assertTrue(client.datasets.Delete.called)
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_delete_dataset_retries_fail(self, patched_time_sleep):
-    client = mock.Mock()
-    client.datasets.Delete.side_effect = ValueError("Cannot delete")
-    wrapper = beam.io.bigquery.BigQueryWrapper(client)
-    with self.assertRaises(ValueError) as _:
-      wrapper._delete_dataset('', '')
-    self.assertEqual(
-        beam.io.bigquery.MAX_RETRIES + 1, client.datasets.Delete.call_count)
-    self.assertTrue(client.datasets.Delete.called)
-
-  def test_delete_non_existing_table(self):
-    client = mock.Mock()
-    client.tables.Delete.side_effect = HttpError(
-        response={'status': '404'}, url='', content='')
-    wrapper = beam.io.bigquery.BigQueryWrapper(client)
-    wrapper._delete_table('', '', '')
-    self.assertTrue(client.tables.Delete.called)
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_delete_table_retries_fail(self, patched_time_sleep):
-    client = mock.Mock()
-    client.tables.Delete.side_effect = ValueError("Cannot delete")
-    wrapper = beam.io.bigquery.BigQueryWrapper(client)
-    with self.assertRaises(ValueError) as _:
-      wrapper._delete_table('', '', '')
-    self.assertTrue(client.tables.Delete.called)
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep):
-    client = mock.Mock()
-    client.datasets.Delete.side_effect = [
-        HttpError(
-            response={'status': '408'}, url='', content=''),
-        bigquery.BigqueryDatasetsDeleteResponse()
-    ]
-    wrapper = beam.io.bigquery.BigQueryWrapper(client)
-    wrapper._delete_dataset('', '')
-    self.assertTrue(client.datasets.Delete.called)
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_delete_table_retries_for_timeouts(self, patched_time_sleep):
-    client = mock.Mock()
-    client.tables.Delete.side_effect = [
-        HttpError(
-            response={'status': '408'}, url='', content=''),
-        bigquery.BigqueryTablesDeleteResponse()
-    ]
-    wrapper = beam.io.bigquery.BigQueryWrapper(client)
-    wrapper._delete_table('', '', '')
-    self.assertTrue(client.tables.Delete.called)
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_temporary_dataset_is_unique(self, patched_time_sleep):
-    client = mock.Mock()
-    client.datasets.Get.return_value = bigquery.Dataset(
-        datasetReference=bigquery.DatasetReference(
-            projectId='project_id', datasetId='dataset_id'))
-    wrapper = beam.io.bigquery.BigQueryWrapper(client)
-    with self.assertRaises(RuntimeError) as _:
-      wrapper.create_temporary_dataset('project_id')
-    self.assertTrue(client.datasets.Get.called)
-
-  def test_get_or_create_dataset_created(self):
-    client = mock.Mock()
-    client.datasets.Get.side_effect = HttpError(
-        response={'status': '404'}, url='', content='')
-    client.datasets.Insert.return_value = bigquery.Dataset(
-        datasetReference=bigquery.DatasetReference(
-            projectId='project_id', datasetId='dataset_id'))
-    wrapper = beam.io.bigquery.BigQueryWrapper(client)
-    new_dataset = wrapper.get_or_create_dataset('project_id', 'dataset_id')
-    self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id')
-
-  def test_get_or_create_dataset_fetched(self):
-    client = mock.Mock()
-    client.datasets.Get.return_value = bigquery.Dataset(
-        datasetReference=bigquery.DatasetReference(
-            projectId='project_id', datasetId='dataset_id'))
-    wrapper = beam.io.bigquery.BigQueryWrapper(client)
-    new_dataset = wrapper.get_or_create_dataset('project_id', 'dataset_id')
-    self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id')
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/io/datastore/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/__init__.py 
b/sdks/python/apache_beam/io/datastore/__init__.py
deleted file mode 100644
index cce3aca..0000000
--- a/sdks/python/apache_beam/io/datastore/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#

http://git-wip-us.apache.org/repos/asf/beam/blob/908c8532/sdks/python/apache_beam/io/datastore/v1/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/__init__.py 
b/sdks/python/apache_beam/io/datastore/v1/__init__.py
deleted file mode 100644
index cce3aca..0000000
--- a/sdks/python/apache_beam/io/datastore/v1/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#

Reply via email to