This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 92b9977 [BEAM-12810] Reverting PR-15185 (#15402) 92b9977 is described below commit 92b9977d71dfbf3ce7d3f42fe0d6cdb8e384a0d0 Author: Vachan <52260220+vachan-she...@users.noreply.github.com> AuthorDate: Fri Aug 27 09:51:36 2021 -0400 [BEAM-12810] Reverting PR-15185 (#15402) * Reverting PR-15185 * Lint fixes. * Formatting fixes. * More formatting fixes. --- sdks/python/apache_beam/io/gcp/bigquery.py | 340 +-------------------- .../apache_beam/io/gcp/bigquery_read_it_test.py | 159 ---------- sdks/python/setup.py | 31 +- 3 files changed, 26 insertions(+), 504 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 4e5f639..88093b1 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -270,7 +270,6 @@ encoding when writing to BigQuery. # pytype: skip-file import collections -import io import itertools import json import logging @@ -278,20 +277,13 @@ import random import time import uuid from typing import Dict -from typing import List -from typing import Optional from typing import Union -import avro.schema -import fastavro -from avro import io as avroio - import apache_beam as beam from apache_beam import coders from apache_beam import pvalue from apache_beam.internal.gcp.json_value import from_json_value from apache_beam.internal.gcp.json_value import to_json_value -from apache_beam.io import range_trackers from apache_beam.io.avroio import _create_avro_source as create_avro_source from apache_beam.io.filesystems import CompressionTypes from apache_beam.io.filesystems import FileSystems @@ -332,7 +324,6 @@ from apache_beam.utils.annotations import experimental try: from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference from apache_beam.io.gcp.internal.clients.bigquery import TableReference - import google.cloud.bigquery_storage_v1 as bq_storage except ImportError: DatasetReference = None TableReference = None @@ -896,276 +887,6 @@ class _CustomBigQuerySource(BoundedSource): return table.schema, metadata_list -class _CustomBigQueryStorageSourceBase(BoundedSource): - """A base class for BoundedSource implementations which read from BigQuery - using the BigQuery Storage API. - - Args: - table (str, TableReference): The ID of the table. The ID must contain only - letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_`` If - **dataset** argument is :data:`None` then the table argument must - contain the entire table reference specified as: - ``'PROJECT:DATASET.TABLE'`` or must specify a TableReference. - dataset (str): Optional ID of the dataset containing this table or - :data:`None` if the table argument specifies a TableReference. - project (str): Optional ID of the project containing this table or - :data:`None` if the table argument specifies a TableReference. - selected_fields (List[str]): Optional List of names of the fields in the - table that should be read. If empty, all fields will be read. If the - specified field is a nested field, all the sub-fields in the field will be - selected. The output field order is unrelated to the order of fields in - selected_fields. - row_restriction (str): Optional SQL text filtering statement, similar to a - WHERE clause in a query. Aggregates are not supported. Restricted to a - maximum length for 1 MB. - """ - - # The maximum number of streams which will be requested when creating a read - # session, regardless of the desired bundle size. - MAX_SPLIT_COUNT = 10000 - # The minimum number of streams which will be requested when creating a read - # session, regardless of the desired bundle size. Note that the server may - # still choose to return fewer than ten streams based on the layout of the - # table. - MIN_SPLIT_COUNT = 10 - - def __init__( - self, - table: Union[str, TableReference], - dataset: Optional[str] = None, - project: Optional[str] = None, - selected_fields: Optional[List[str]] = None, - row_restriction: Optional[str] = None, - use_fastavro_for_direct_read: Optional[bool] = None, - pipeline_options: Optional[GoogleCloudOptions] = None): - - self.table_reference = bigquery_tools.parse_table_reference( - table, dataset, project) - self.table = self.table_reference.tableId - self.dataset = self.table_reference.datasetId - self.project = self.table_reference.projectId - self.selected_fields = selected_fields - self.row_restriction = row_restriction - self.use_fastavro = \ - True if use_fastavro_for_direct_read is None else \ - use_fastavro_for_direct_read - self.pipeline_options = pipeline_options - self.split_result = None - - def _get_parent_project(self): - """Returns the project that will be billed.""" - project = self.pipeline_options.view_as(GoogleCloudOptions).project - if isinstance(project, vp.ValueProvider): - project = project.get() - if not project: - project = self.project - return project - - def _get_table_size(self, table, dataset, project): - if project is None: - project = self._get_parent_project() - - bq = bigquery_tools.BigQueryWrapper() - table = bq.get_table(project, dataset, table) - return table.numBytes - - def display_data(self): - return { - 'project': str(self.project), - 'dataset': str(self.dataset), - 'table': str(self.table), - 'selected_fields': str(self.selected_fields), - 'row_restriction': str(self.row_restriction), - 'use_fastavro': str(self.use_fastavro) - } - - def estimate_size(self): - # Returns the pre-filtering size of the table being read. - return self._get_table_size(self.table, self.dataset, self.project) - - def split(self, desired_bundle_size, start_position=None, stop_position=None): - requested_session = bq_storage.types.ReadSession() - requested_session.table = 'projects/{}/datasets/{}/tables/{}'.format( - self.project, self.dataset, self.table) - requested_session.data_format = bq_storage.types.DataFormat.AVRO - if self.selected_fields is not None: - requested_session.read_options.selected_fields = self.selected_fields - if self.row_restriction is not None: - requested_session.read_options.row_restriction = self.row_restriction - - storage_client = bq_storage.BigQueryReadClient() - stream_count = 0 - if desired_bundle_size > 0: - table_size = self._get_table_size(self.table, self.dataset, self.project) - stream_count = min( - int(table_size / desired_bundle_size), - _CustomBigQueryStorageSourceBase.MAX_SPLIT_COUNT) - stream_count = max( - stream_count, _CustomBigQueryStorageSourceBase.MIN_SPLIT_COUNT) - - parent = 'projects/{}'.format(self.project) - read_session = storage_client.create_read_session( - parent=parent, - read_session=requested_session, - max_stream_count=stream_count) - _LOGGER.info( - 'Sent BigQuery Storage API CreateReadSession request: \n %s \n' - 'Received response \n %s.', - requested_session, - read_session) - - self.split_result = [ - _CustomBigQueryStorageStreamSource(stream.name, self.use_fastavro) - for stream in read_session.streams - ] - - for source in self.split_result: - yield SourceBundle( - weight=1.0, source=source, start_position=None, stop_position=None) - - def get_range_tracker(self, start_position, stop_position): - class NonePositionRangeTracker(RangeTracker): - """A RangeTracker that always returns positions as None. Prevents the - BigQuery Storage source from being read() before being split().""" - def start_position(self): - return None - - def stop_position(self): - return None - - return NonePositionRangeTracker() - - def read(self, range_tracker): - raise NotImplementedError( - 'BigQuery storage source must be split before being read') - - -class _CustomBigQueryStorageStreamSource(BoundedSource): - """A source representing a single stream in a read session.""" - def __init__(self, read_stream_name: str, use_fastavro: bool): - self.read_stream_name = read_stream_name - self.use_fastavro = use_fastavro - - def display_data(self): - return { - 'read_stream': str(self.read_stream_name), - } - - def estimate_size(self): - # The size of stream source cannot be estimate due to server-side liquid - # sharding. - # TODO: Implement progress reporting. - return None - - def split(self, desired_bundle_size, start_position=None, stop_position=None): - # A stream source can't be split without reading from it due to - # server-side liquid sharding. A split will simply return the current source - # for now. - return SourceBundle( - weight=1.0, - source=_CustomBigQueryStorageStreamSource( - self.read_stream_name, self.use_fastavro), - start_position=None, - stop_position=None) - - def get_range_tracker(self, start_position, stop_position): - # TODO: Implement dynamic work rebalancing. - assert start_position is None - # Defaulting to the start of the stream. - start_position = 0 - # Since the streams are unsplittable we choose OFFSET_INFINITY as the - # default end offset so that all data of the source gets read. - stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY - range_tracker = range_trackers.OffsetRangeTracker( - start_position, stop_position) - # Ensuring that all try_split() calls will be ignored by the Rangetracker. - range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker) - - return range_tracker - - def read(self, range_tracker): - _LOGGER.info( - "Started BigQuery Storage API read from stream %s.", - self.read_stream_name) - storage_client = bq_storage.BigQueryReadClient() - read_rows_iterator = iter(storage_client.read_rows(self.read_stream_name)) - # Handling the case where the user might provide very selective filters - # which can result in read_rows_response being empty. - first_read_rows_response = next(read_rows_iterator, None) - if first_read_rows_response is None: - return iter([]) - - if self.use_fastavro: - row_reader = _ReadRowsResponseReaderWithFastAvro( - read_rows_iterator, first_read_rows_response) - return iter(row_reader) - - row_reader = _ReadRowsResponseReader( - read_rows_iterator, first_read_rows_response) - return iter(row_reader) - - -class _ReadRowsResponseReaderWithFastAvro(): - """An iterator that deserializes ReadRowsResponses using the fastavro - library.""" - def __init__(self, read_rows_iterator, read_rows_response): - self.read_rows_iterator = read_rows_iterator - self.read_rows_response = read_rows_response - self.avro_schema = fastavro.parse_schema( - json.loads(self.read_rows_response.avro_schema.schema)) - self.bytes_reader = io.BytesIO( - self.read_rows_response.avro_rows.serialized_binary_rows) - - def __iter__(self): - return self - - def __next__(self): - try: - return fastavro.schemaless_reader(self.bytes_reader, self.avro_schema) - except StopIteration: - self.read_rows_response = next(self.read_rows_iterator, None) - if self.read_rows_response is not None: - self.bytes_reader = io.BytesIO( - self.read_rows_response.avro_rows.serialized_binary_rows) - return fastavro.schemaless_reader(self.bytes_reader, self.avro_schema) - else: - raise StopIteration - - -class _ReadRowsResponseReader(): - """An iterator that deserializes ReadRowsResponses.""" - def __init__(self, read_rows_iterator, read_rows_response): - self.read_rows_iterator = read_rows_iterator - self.read_rows_response = read_rows_response - self.avro_schema = avro.schema.Parse( - self.read_rows_response.avro_schema.schema) - self.reader = avroio.DatumReader(self.avro_schema) - self.decoder = avroio.BinaryDecoder( - io.BytesIO(self.read_rows_response.avro_rows.serialized_binary_rows)) - self.next_row = 0 - - def __iter__(self): - return self - - def get_deserialized_row(self): - deserialized_row = self.reader.read(self.decoder) - self.next_row += 1 - return deserialized_row - - def __next__(self): - if self.next_row < self.read_rows_response.row_count: - return self.get_deserialized_row() - - self.read_rows_response = next(self.read_rows_iterator, None) - if self.read_rows_response is not None: - self.decoder = avroio.BinaryDecoder( - io.BytesIO(self.read_rows_response.avro_rows.serialized_binary_rows)) - self.next_row = 0 - return self.get_deserialized_row() - else: - raise StopIteration - - @deprecated(since='2.11.0', current="WriteToBigQuery") class BigQuerySink(dataflow_io.NativeSink): """A sink based on a BigQuery table. @@ -2120,38 +1841,19 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, class ReadFromBigQuery(PTransform): """Read data from BigQuery. - This PTransform uses either a BigQuery export job to take a snapshot of the - table on GCS, and then reads from each produced file (EXPORT) or reads - directly from BigQuery storage using BigQuery Read API (DIRECT_READ). The - option is specified using the 'method' :parameter. File format is Avro by + This PTransform uses a BigQuery export job to take a snapshot of the table + on GCS, and then reads from each produced file. File format is Avro by default. - NOTE: DIRECT_READ only supports reading from BigQuery Tables currently. To - read the results of a query please use EXPORT. - - .. warning:: - DATETIME columns are parsed as strings in the fastavro library. As a - result, such columns will be converted to Python strings instead of native - Python DATETIME types. - Args: - method: The method to use to read from BigQuery. It may be EXPORT or - DIRECT_READ. EXPORT invokes a BigQuery export request - (https://cloud.google.com/bigquery/docs/exporting-data). DIRECT_READ reads - directly from BigQuery storage using the BigQuery Read API - (https://cloud.google.com/bigquery/docs/reference/storage). If - unspecified, the default is currently EXPORT. - use_fastavro_for_direct_read (bool): If method is `DIRECT_READ` and - :data:`True`, the fastavro library is used to deserialize the data - received from the BigQuery Read API. The default here is :data:`True`. table (str, callable, ValueProvider): The ID of the table, or a callable that returns it. The ID must contain only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If dataset argument is :data:`None` then the table argument must contain the entire table - reference specified as: ``'PROJECT:DATASET.TABLE'``. - If it's a callable, it must receive one argument representing an element - to be written to BigQuery, and return a TableReference, or a string table - name as specified above. + reference specified as: ``'DATASET.TABLE'`` + or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one + argument representing an element to be written to BigQuery, and return + a TableReference, or a string table name as specified above. dataset (str): The ID of the dataset containing this table or :data:`None` if the table reference is specified entirely by the table argument. @@ -2212,22 +1914,16 @@ class ReadFromBigQuery(PTransform): to create and delete tables within the given dataset. Dataset name should *not* start with the reserved prefix `beam_temp_dataset_`. """ - class Method(object): - EXPORT = 'EXPORT' # This is currently the default. - DIRECT_READ = 'DIRECT_READ' COUNTER = 0 - def __init__(self, gcs_location=None, method=None, *args, **kwargs): - self.method = method or ReadFromBigQuery.Method.EXPORT - - if gcs_location and self.method is ReadFromBigQuery.Method.EXPORT: + def __init__(self, gcs_location=None, *args, **kwargs): + if gcs_location: if not isinstance(gcs_location, (str, ValueProvider)): raise TypeError( '%s: gcs_location must be of type string' ' or ValueProvider; got %r instead' % (self.__class__.__name__, type(gcs_location))) - if isinstance(gcs_location, str): gcs_location = StaticValueProvider(str, gcs_location) @@ -2236,17 +1932,6 @@ class ReadFromBigQuery(PTransform): self._kwargs = kwargs def expand(self, pcoll): - # TODO(BEAM-11115): Make ReadFromBQ rely on ReadAllFromBQ implementation. - if self.method is ReadFromBigQuery.Method.EXPORT: - return self._expand_export(pcoll) - elif self.method is ReadFromBigQuery.Method.DIRECT_READ: - return self._expand_direct_read(pcoll) - else: - raise ValueError( - 'The method to read from BigQuery must be either EXPORT' - 'or DIRECT_READ.') - - def _expand_export(self, pcoll): temp_location = pcoll.pipeline.options.view_as( GoogleCloudOptions).temp_location job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name @@ -2281,15 +1966,6 @@ class ReadFromBigQuery(PTransform): **self._kwargs)) | _PassThroughThenCleanup(files_to_remove_pcoll)) - def _expand_direct_read(self, pcoll): - return ( - pcoll - | beam.io.Read( - _CustomBigQueryStorageSourceBase( - pipeline_options=pcoll.pipeline.options, - *self._args, - **self._kwargs))) - class ReadFromBigQueryRequest: """ diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index 53bf567..9216a9c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -26,14 +26,12 @@ import logging import random import time import unittest -import uuid from decimal import Decimal from functools import wraps import pytest import apache_beam as beam -from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.options.value_provider import StaticValueProvider @@ -177,163 +175,6 @@ class ReadTests(BigQueryReadIntegrationTests): assert_that(result, equal_to(self.TABLE_DATA)) -class ReadUsingStorageApiTests(BigQueryReadIntegrationTests): - TABLE_DATA = [{ - 'number': 1, 'str': 'abc' - }, { - 'number': 2, 'str': 'def' - }, { - 'number': 3, 'str': u'你好' - }, { - 'number': 4, 'str': u'привет' - }] - - @classmethod - def setUpClass(cls): - super(ReadUsingStorageApiTests, cls).setUpClass() - cls.table_name = 'python_read_table' - cls._create_table(cls.table_name) - - table_id = '{}.{}'.format(cls.dataset_id, cls.table_name) - cls.query = 'SELECT number, str FROM `%s`' % table_id - - # Materializing the newly created Table to ensure the Read API can stream. - cls.temp_table_reference = cls._execute_query(cls.project, cls.query) - - @classmethod - def tearDownClass(cls): - cls.bigquery_client.clean_up_temporary_dataset(cls.project) - super(ReadUsingStorageApiTests, cls).tearDownClass() - - @classmethod - def _create_table(cls, table_name): - table_schema = bigquery.TableSchema() - table_field = bigquery.TableFieldSchema() - table_field.name = 'number' - table_field.type = 'INTEGER' - table_schema.fields.append(table_field) - table_field = bigquery.TableFieldSchema() - table_field.name = 'str' - table_field.type = 'STRING' - table_schema.fields.append(table_field) - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId=cls.project, datasetId=cls.dataset_id, - tableId=table_name), - schema=table_schema) - request = bigquery.BigqueryTablesInsertRequest( - projectId=cls.project, datasetId=cls.dataset_id, table=table) - cls.bigquery_client.client.tables.Insert(request) - cls.bigquery_client.insert_rows( - cls.project, cls.dataset_id, table_name, cls.TABLE_DATA) - - @classmethod - def _setup_temporary_dataset(cls, project, query): - location = cls.bigquery_client.get_query_location(project, query, False) - cls.bigquery_client.create_temporary_dataset(project, location) - - @classmethod - def _execute_query(cls, project, query): - query_job_name = bigquery_tools.generate_bq_job_name( - 'materializing_table_before_reading', - str(uuid.uuid4())[0:10], - bigquery_tools.BigQueryJobTypes.QUERY, - '%s_%s' % (int(time.time()), random.randint(0, 1000))) - cls._setup_temporary_dataset(cls.project, cls.query) - job = cls.bigquery_client._start_query_job( - project, - query, - use_legacy_sql=False, - flatten_results=False, - job_id=query_job_name) - job_ref = job.jobReference - cls.bigquery_client.wait_for_bq_job(job_ref, max_retries=0) - return cls.bigquery_client._get_temp_table(project) - - def test_iobase_source(self): - with beam.Pipeline(argv=self.args) as p: - result = ( - p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( - method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, - project=self.temp_table_reference.projectId, - dataset=self.temp_table_reference.datasetId, - table=self.temp_table_reference.tableId, - use_fastavro_for_direct_read=True)) - assert_that(result, equal_to(self.TABLE_DATA)) - - def test_iobase_source_without_fastavro(self): - with beam.Pipeline(argv=self.args) as p: - result = ( - p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( - method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, - project=self.temp_table_reference.projectId, - dataset=self.temp_table_reference.datasetId, - table=self.temp_table_reference.tableId, - use_fastavro_for_direct_read=False)) - assert_that(result, equal_to(self.TABLE_DATA)) - - def test_iobase_source_with_column_selection(self): - EXPECTED_TABLE_DATA = [{ - 'number': 1 - }, { - 'number': 2 - }, { - 'number': 3 - }, { - 'number': 4 - }] - with beam.Pipeline(argv=self.args) as p: - result = ( - p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( - method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, - project=self.temp_table_reference.projectId, - dataset=self.temp_table_reference.datasetId, - table=self.temp_table_reference.tableId, - selected_fields=['number'])) - assert_that(result, equal_to(EXPECTED_TABLE_DATA)) - - def test_iobase_source_with_row_restriction(self): - EXPECTED_TABLE_DATA = [{ - 'number': 3, 'str': u'你好' - }, { - 'number': 4, 'str': u'привет' - }] - with beam.Pipeline(argv=self.args) as p: - result = ( - p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( - method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, - project=self.temp_table_reference.projectId, - dataset=self.temp_table_reference.datasetId, - table=self.temp_table_reference.tableId, - row_restriction='number > 2')) - assert_that(result, equal_to(EXPECTED_TABLE_DATA)) - - def test_iobase_source_with_column_selection_and_row_restriction(self): - EXPECTED_TABLE_DATA = [{'str': u'你好'}, {'str': u'привет'}] - with beam.Pipeline(argv=self.args) as p: - result = ( - p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( - method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, - project=self.temp_table_reference.projectId, - dataset=self.temp_table_reference.datasetId, - table=self.temp_table_reference.tableId, - selected_fields=['str'], - row_restriction='number > 2')) - assert_that(result, equal_to(EXPECTED_TABLE_DATA)) - - def test_iobase_source_with_very_selective_filters(self): - with beam.Pipeline(argv=self.args) as p: - result = ( - p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( - method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, - project=self.temp_table_reference.projectId, - dataset=self.temp_table_reference.datasetId, - table=self.temp_table_reference.tableId, - selected_fields=['str'], - row_restriction='number > 4')) - assert_that(result, equal_to([])) - - class ReadNewTypesTests(BigQueryReadIntegrationTests): @classmethod def setUpClass(cls): diff --git a/sdks/python/setup.py b/sdks/python/setup.py index f4e02b8..170fbff 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -103,7 +103,6 @@ if StrictVersion(_PIP_VERSION) < StrictVersion(REQUIRED_PIP_VERSION): ) ) - REQUIRED_CYTHON_VERSION = '0.28.1' try: _CYTHON_VERSION = get_distribution('cython').version @@ -155,7 +154,7 @@ REQUIRED_PACKAGES = [ 'pytz>=2018.3', 'requests>=2.24.0,<3.0.0', 'typing-extensions>=3.7.0,<4', - ] +] # [BEAM-8181] pyarrow cannot be installed on 32-bit Windows platforms. if sys.platform == 'win32' and sys.maxsize <= 2**32: @@ -178,7 +177,7 @@ REQUIRED_TEST_PACKAGES = [ 'sqlalchemy>=1.3,<2.0', 'psycopg2-binary>=2.8.5,<3.0.0', 'testcontainers>=3.0.3,<4.0.0', - ] +] GCP_REQUIREMENTS = [ 'cachetools>=3.1.0,<5', @@ -186,7 +185,6 @@ GCP_REQUIREMENTS = [ 'google-auth>=1.18.0,<2', 'google-cloud-datastore>=1.8.0,<2', 'google-cloud-pubsub>=0.39.0,<2', - 'google-cloud-bigquery-storage>=2.4.0', # GCP packages required by tests 'google-cloud-bigquery>=1.6.0,<3', 'google-cloud-core>=0.28.1,<2', @@ -223,9 +221,7 @@ INTERACTIVE_BEAM_TEST = [ 'pillow>=7.1.1,<8', ] -AWS_REQUIREMENTS = [ - 'boto3 >=1.9' -] +AWS_REQUIREMENTS = ['boto3 >=1.9'] AZURE_REQUIREMENTS = [ 'azure-storage-blob >=12.3.2', @@ -233,7 +229,6 @@ AZURE_REQUIREMENTS = [ ] - # We must generate protos after setup_requires are installed. def generate_protos_first(original_cmd): try: @@ -245,6 +240,7 @@ def generate_protos_first(original_cmd): def run(self): gen_protos.generate_proto_files() super(cmd, self).run() + return cmd except ImportError: warnings.warn("Could not import gen_protos, skipping proto generation.") @@ -256,8 +252,8 @@ python_requires = '>=3.6' if sys.version_info.major == 3 and sys.version_info.minor >= 9: warnings.warn( 'This version of Apache Beam has not been sufficiently tested on ' - 'Python %s.%s. You may encounter bugs or missing features.' % ( - sys.version_info.major, sys.version_info.minor)) + 'Python %s.%s. You may encounter bugs or missing features.' % + (sys.version_info.major, sys.version_info.minor)) setuptools.setup( name=PACKAGE_NAME, @@ -269,9 +265,18 @@ setuptools.setup( author=PACKAGE_AUTHOR, author_email=PACKAGE_EMAIL, packages=setuptools.find_packages(), - package_data={'apache_beam': [ - '*/*.pyx', '*/*/*.pyx', '*/*.pxd', '*/*/*.pxd', '*/*.h', '*/*/*.h', - 'testing/data/*.yaml', 'portability/api/*.yaml']}, + package_data={ + 'apache_beam': [ + '*/*.pyx', + '*/*/*.pyx', + '*/*.pxd', + '*/*/*.pxd', + '*/*.h', + '*/*/*.h', + 'testing/data/*.yaml', + 'portability/api/*.yaml' + ] + }, ext_modules=cythonize([ # Make sure to use language_level=3 cython directive in files below. 'apache_beam/**/*.pyx',