[ 
https://issues.apache.org/jira/browse/BEAM-6457?focusedWorklogId=190137&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-190137
 ]

ASF GitHub Bot logged work on BEAM-6457:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Jan/19 18:35
            Start Date: 25/Jan/19 18:35
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on pull request #7605: [BEAM-6457] 
Refactoring of a few BigQuery classes.
URL: https://github.com/apache/beam/pull/7605#discussion_r251089745
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery_test.py
 ##########
 @@ -338,521 +271,6 @@ def test_nested_schema_as_json(self):
         json.loads(sink.schema_as_json()))
 
 
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
-class TestBigQueryReader(unittest.TestCase):
-
-  def get_test_rows(self):
-    now = time.time()
-    dt = datetime.datetime.utcfromtimestamp(float(now))
-    ts = dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC')
-    expected_rows = [
-        {
-            'i': 1,
-            's': 'abc',
-            'f': 2.3,
-            'b': True,
-            't': ts,
-            'dt': '2016-10-31',
-            'ts': '22:39:12.627498',
-            'dt_ts': '2008-12-25T07:30:00',
-            'r': {'s2': 'b'},
-            'rpr': [{'s3': 'c', 'rpr2': [{'rs': ['d', 'e'], 's4': None}]}]
-        },
-        {
-            'i': 10,
-            's': 'xyz',
-            'f': -3.14,
-            'b': False,
-            'rpr': [],
-            't': None,
-            'dt': None,
-            'ts': None,
-            'dt_ts': None,
-            'r': None,
-        }]
-
-    nested_schema = [
-        bigquery.TableFieldSchema(
-            name='s2', type='STRING', mode='NULLABLE')]
-    nested_schema_2 = [
-        bigquery.TableFieldSchema(
-            name='s3', type='STRING', mode='NULLABLE'),
-        bigquery.TableFieldSchema(
-            name='rpr2', type='RECORD', mode='REPEATED', fields=[
-                bigquery.TableFieldSchema(
-                    name='rs', type='STRING', mode='REPEATED'),
-                bigquery.TableFieldSchema(
-                    name='s4', type='STRING', mode='NULLABLE')])]
-
-    schema = bigquery.TableSchema(
-        fields=[
-            bigquery.TableFieldSchema(
-                name='b', type='BOOLEAN', mode='REQUIRED'),
-            bigquery.TableFieldSchema(
-                name='f', type='FLOAT', mode='REQUIRED'),
-            bigquery.TableFieldSchema(
-                name='i', type='INTEGER', mode='REQUIRED'),
-            bigquery.TableFieldSchema(
-                name='s', type='STRING', mode='REQUIRED'),
-            bigquery.TableFieldSchema(
-                name='t', type='TIMESTAMP', mode='NULLABLE'),
-            bigquery.TableFieldSchema(
-                name='dt', type='DATE', mode='NULLABLE'),
-            bigquery.TableFieldSchema(
-                name='ts', type='TIME', mode='NULLABLE'),
-            bigquery.TableFieldSchema(
-                name='dt_ts', type='DATETIME', mode='NULLABLE'),
-            bigquery.TableFieldSchema(
-                name='r', type='RECORD', mode='NULLABLE',
-                fields=nested_schema),
-            bigquery.TableFieldSchema(
-                name='rpr', type='RECORD', mode='REPEATED',
-                fields=nested_schema_2)])
-
-    table_rows = [
-        bigquery.TableRow(f=[
-            bigquery.TableCell(v=to_json_value('true')),
-            bigquery.TableCell(v=to_json_value(str(2.3))),
-            bigquery.TableCell(v=to_json_value(str(1))),
-            bigquery.TableCell(v=to_json_value('abc')),
-            # For timestamps cannot use str() because it will truncate the
-            # number representing the timestamp.
-            bigquery.TableCell(v=to_json_value('%f' % now)),
-            bigquery.TableCell(v=to_json_value('2016-10-31')),
-            bigquery.TableCell(v=to_json_value('22:39:12.627498')),
-            bigquery.TableCell(v=to_json_value('2008-12-25T07:30:00')),
-            # For record we cannot use dict because it doesn't create nested
-            # schemas correctly so we have to use this f,v based format
-            bigquery.TableCell(v=to_json_value({'f': [{'v': 'b'}]})),
-            bigquery.TableCell(v=to_json_value([{'v':{'f':[{'v': 'c'}, {'v':[
-                {'v':{'f':[{'v':[{'v':'d'}, {'v':'e'}]}, {'v':None}]}}]}]}}]))
-            ]),
-        bigquery.TableRow(f=[
-            bigquery.TableCell(v=to_json_value('false')),
-            bigquery.TableCell(v=to_json_value(str(-3.14))),
-            bigquery.TableCell(v=to_json_value(str(10))),
-            bigquery.TableCell(v=to_json_value('xyz')),
-            bigquery.TableCell(v=None),
-            bigquery.TableCell(v=None),
-            bigquery.TableCell(v=None),
-            bigquery.TableCell(v=None),
-            bigquery.TableCell(v=None),
-            # REPEATED field without any values.
-            bigquery.TableCell(v=None)])]
-    return table_rows, schema, expected_rows
-
-  def test_read_from_table(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    client.jobs.GetQueryResults.return_value = 
bigquery.GetQueryResultsResponse(
-        jobComplete=True, rows=table_rows, schema=schema)
-    actual_rows = []
-    with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, expected_rows)
-    self.assertEqual(schema, reader.schema)
-
-  def test_read_from_query(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    client.jobs.GetQueryResults.return_value = 
bigquery.GetQueryResultsResponse(
-        jobComplete=True, rows=table_rows, schema=schema)
-    actual_rows = []
-    with beam.io.BigQuerySource(query='query').reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, expected_rows)
-    self.assertEqual(schema, reader.schema)
-    self.assertTrue(reader.use_legacy_sql)
-    self.assertTrue(reader.flatten_results)
-
-  def test_read_from_query_sql_format(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    client.jobs.GetQueryResults.return_value = 
bigquery.GetQueryResultsResponse(
-        jobComplete=True, rows=table_rows, schema=schema)
-    actual_rows = []
-    with beam.io.BigQuerySource(
-        query='query', use_standard_sql=True).reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, expected_rows)
-    self.assertEqual(schema, reader.schema)
-    self.assertFalse(reader.use_legacy_sql)
-    self.assertTrue(reader.flatten_results)
-
-  def test_read_from_query_unflatten_records(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    client.jobs.GetQueryResults.return_value = 
bigquery.GetQueryResultsResponse(
-        jobComplete=True, rows=table_rows, schema=schema)
-    actual_rows = []
-    with beam.io.BigQuerySource(
-        query='query', flatten_results=False).reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, expected_rows)
-    self.assertEqual(schema, reader.schema)
-    self.assertTrue(reader.use_legacy_sql)
-    self.assertFalse(reader.flatten_results)
-
-  def test_using_both_query_and_table_fails(self):
-    with self.assertRaisesRegexp(
-        ValueError,
-        r'Both a BigQuery table and a query were specified\. Please specify '
-        r'only one of these'):
-      beam.io.BigQuerySource(table='dataset.table', query='query')
-
-  def test_using_neither_query_nor_table_fails(self):
-    with self.assertRaisesRegexp(
-        ValueError, r'A BigQuery table or a query must be specified'):
-      beam.io.BigQuerySource()
-
-  def test_read_from_table_as_tablerows(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, _ = self.get_test_rows()
-    client.jobs.GetQueryResults.return_value = 
bigquery.GetQueryResultsResponse(
-        jobComplete=True, rows=table_rows, schema=schema)
-    actual_rows = []
-    # We set the coder to TableRowJsonCoder, which is a signal that
-    # the caller wants to see the rows as TableRows.
-    with beam.io.BigQuerySource(
-        'dataset.table', coder=TableRowJsonCoder).reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, table_rows)
-    self.assertEqual(schema, reader.schema)
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_read_from_table_and_job_complete_retry(self, patched_time_sleep):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    # Return jobComplete=False on first call to trigger the code path where
-    # query needs to handle waiting a bit.
-    client.jobs.GetQueryResults.side_effect = [
-        bigquery.GetQueryResultsResponse(
-            jobComplete=False),
-        bigquery.GetQueryResultsResponse(
-            jobComplete=True, rows=table_rows, schema=schema)]
-    actual_rows = []
-    with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    self.assertEqual(actual_rows, expected_rows)
-
-  def test_read_from_table_and_multiple_pages(self):
-    client = mock.Mock()
-    client.jobs.Insert.return_value = bigquery.Job(
-        jobReference=bigquery.JobReference(
-            jobId='somejob'))
-    table_rows, schema, expected_rows = self.get_test_rows()
-    # Return a pageToken on first call to trigger the code path where
-    # query needs to handle multiple pages of results.
-    client.jobs.GetQueryResults.side_effect = [
-        bigquery.GetQueryResultsResponse(
-            jobComplete=True, rows=table_rows, schema=schema,
-            pageToken='token'),
-        bigquery.GetQueryResultsResponse(
-            jobComplete=True, rows=table_rows, schema=schema)]
-    actual_rows = []
-    with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
-      for row in reader:
-        actual_rows.append(row)
-    # We return expected rows for each of the two pages of results so we
-    # adjust our expectation below accordingly.
-    self.assertEqual(actual_rows, expected_rows * 2)
-
-  def test_table_schema_without_project(self):
-    # Reader should pick executing project by default.
-    source = beam.io.BigQuerySource(table='mydataset.mytable')
-    options = PipelineOptions(flags=['--project', 'myproject'])
-    source.pipeline_options = options
-    reader = source.reader()
-    self.assertEquals('SELECT * FROM [myproject:mydataset.mytable];',
-                      reader.query)
-
-
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
-class TestBigQueryWriter(unittest.TestCase):
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_no_table_and_create_never(self, patched_time_sleep):
-    client = mock.Mock()
-    client.tables.Get.side_effect = HttpError(
-        response={'status': '404'}, url='', content='')
-    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
-    with self.assertRaisesRegexp(
-        RuntimeError, r'Table project:dataset\.table not found but create '
-                      r'disposition is CREATE_NEVER'):
-      with beam.io.BigQuerySink(
-          'project:dataset.table',
-          create_disposition=create_disposition).writer(client):
-        pass
-
-  def test_no_table_and_create_if_needed(self):
-    client = mock.Mock()
-    table = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId='project', datasetId='dataset', tableId='table'),
-        schema=bigquery.TableSchema())
-    client.tables.Get.side_effect = HttpError(
-        response={'status': '404'}, url='', content='')
-    client.tables.Insert.return_value = table
-    create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
-    with beam.io.BigQuerySink(
-        'project:dataset.table',
-        schema='somefield:INTEGER',
-        create_disposition=create_disposition).writer(client):
-      pass
-    self.assertTrue(client.tables.Get.called)
-    self.assertTrue(client.tables.Insert.called)
-
-  @mock.patch('time.sleep', return_value=None)
 
 Review comment:
   Yes, these are all moves : D
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 190137)
    Time Spent: 3h  (was: 2h 50m)

> bigquery.py is too large, and some tools are better moved elsewhere
> -------------------------------------------------------------------
>
>                 Key: BEAM-6457
>                 URL: https://issues.apache.org/jira/browse/BEAM-6457
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Need to do a bit of refactoring of that file



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to