[1/2] incubator-beam git commit: Add support for date partitioned table names
Repository: incubator-beam Updated Branches: refs/heads/python-sdk bb09c07b6 -> 409d067b3 Add support for date partitioned table names These names have the format "tablename$mmdd". Previously the dollar sign caused this to be deemed invalid. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1af871a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1af871a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1af871a Branch: refs/heads/python-sdk Commit: a1af871a0c8c92a6d84f2e9950615f7737118d7e Parents: bb09c07 Author: Kevin Graney Authored: Tue Dec 6 15:09:42 2016 -0500 Committer: Robert Bradshaw Committed: Wed Dec 21 15:16:45 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 6 -- sdks/python/apache_beam/io/bigquery_test.py | 8 2 files changed, 12 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1af871a/sdks/python/apache_beam/io/bigquery.py -- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index ce75e10..2059de4 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -275,7 +275,9 @@ def _parse_table_reference(table, dataset=None, project=None): then the table argument must contain the entire table reference: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. This argument can be a bigquery.TableReference instance in which case dataset and project are - ignored and the reference is returned as a result. + ignored and the reference is returned as a result. Additionally, for date + partitioned tables, appending '$mmdd' to the table name is supported, + e.g. 'DATASET.TABLE$mmdd'. dataset: The ID of the dataset containing this table or null if the table reference is specified entirely by the table argument. project: The ID of the project containing this table or null if the table @@ -300,7 +302,7 @@ def _parse_table_reference(table, dataset=None, project=None): # table name. if dataset is None: match = re.match( -r'^((?P.+):)?(?P\w+)\.(?P\w+)$', table) +r'^((?P.+):)?(?P\w+)\.(?P[\w\$]+)$', table) if not match: raise ValueError( 'Expected a table reference (PROJECT:DATASET.TABLE or ' http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1af871a/sdks/python/apache_beam/io/bigquery_test.py -- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index a2cf947..f6f9363 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -208,6 +208,14 @@ class TestBigQuerySource(unittest.TestCase): self.assertEqual(source.query, 'my_query') self.assertIsNone(source.table_reference) + def test_date_partitioned_table_name(self): +source = beam.io.BigQuerySource('dataset.table$20030102', validate=True) +dd = DisplayData.create_from(source) +expected_items = [ +DisplayDataItemMatcher('validation', True), +DisplayDataItemMatcher('table', 'dataset.table$20030102')] +hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + class TestBigQuerySink(unittest.TestCase):
[2/2] incubator-beam git commit: Closes #1534
Closes #1534 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/409d067b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/409d067b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/409d067b Branch: refs/heads/python-sdk Commit: 409d067b36036981e330a055b652bb74a93f4ca2 Parents: bb09c07 a1af871 Author: Robert Bradshaw Authored: Wed Dec 21 15:16:46 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 21 15:16:46 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 6 -- sdks/python/apache_beam/io/bigquery_test.py | 8 2 files changed, 12 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Fixing inconsistencies in PipelineOptions
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 3454d691f -> bb09c07b6 Fixing inconsistencies in PipelineOptions The following options have changed: * job_name - Default is 'beamapp-username-date-microseconds'. Test was added. * staging_location and temp_location - staging_location was the default of temp_location. Now it's the other way around, and the tests reflect that. * machine_type alias of worker_machine_type has been removed. * disk_type alias of worker_disk_type has been removed. * disk_source_image option has been removed. * no_save_main_session option has been removed. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35e2fdc7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35e2fdc7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35e2fdc7 Branch: refs/heads/python-sdk Commit: 35e2fdc7f22f20d74a569e86ced931209661dec1 Parents: 3454d69 Author: Pablo Authored: Tue Dec 6 18:01:54 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 21 15:14:52 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 45 .../apache_beam/internal/apiclient_test.py | 6 +++ sdks/python/apache_beam/utils/options.py| 33 ++ .../utils/pipeline_options_validator.py | 11 ++--- .../utils/pipeline_options_validator_test.py| 8 ++-- 5 files changed, 54 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index f1341a7..3a9ba46 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -18,6 +18,8 @@ """Dataflow client utility functions.""" import codecs +from datetime import datetime +import getpass import json import logging import os @@ -46,10 +48,6 @@ from apache_beam.utils.options import WorkerOptions from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow -BIGQUERY_API_SERVICE = 'bigquery.googleapis.com' -COMPUTE_API_SERVICE = 'compute.googleapis.com' -STORAGE_API_SERVICE = 'storage.googleapis.com' - class Step(object): """Wrapper for a dataflow Step protobuf.""" @@ -121,11 +119,13 @@ class Environment(object): self.worker_options = options.view_as(WorkerOptions) self.debug_options = options.view_as(DebugOptions) self.proto = dataflow.Environment() -self.proto.clusterManagerApiService = COMPUTE_API_SERVICE -self.proto.dataset = '%s/cloud_dataflow' % BIGQUERY_API_SERVICE +self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE +self.proto.dataset = '{}/cloud_dataflow'.format( +GoogleCloudOptions.BIGQUERY_API_SERVICE) self.proto.tempStoragePrefix = ( -self.google_cloud_options.temp_location.replace('gs:/', -STORAGE_API_SERVICE)) +self.google_cloud_options.temp_location.replace( +'gs:/', +GoogleCloudOptions.STORAGE_API_SERVICE)) # User agent information. self.proto.userAgent = dataflow.Environment.UserAgentValue() self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint @@ -165,7 +165,7 @@ class Environment(object): dataflow.Package( location='%s/%s' % ( self.google_cloud_options.staging_location.replace( - 'gs:/', STORAGE_API_SERVICE), + 'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE), package), name=package)) @@ -174,7 +174,7 @@ class Environment(object): packages=package_descriptors, taskrunnerSettings=dataflow.TaskRunnerSettings( parallelWorkerSettings=dataflow.WorkerSettings( -baseUrl='https://dataflow.googleapis.com', +baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT, servicePath=self.google_cloud_options.dataflow_endpoint))) pool.autoscalingSettings = dataflow.AutoscalingSettings() # Set worker pool options received through command line. @@ -195,8 +195,6 @@ class Environment(object): pool.diskSizeGb = self.worker_options.disk_size_gb if self.worker_options.disk_type: pool.diskType = self.worker_options.disk_type -if self.worker_options.disk_source_image: - pool.diskSourceImage = self.worker_options.disk_source_image if self.worker_options.zone: pool.zone = self.worker_options.zone if self.worker_options.network: @@ -299,10 +297,23 @@ c
[2/2] incubator-beam git commit: Closes #1526
Closes #1526 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bb09c07b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bb09c07b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bb09c07b Branch: refs/heads/python-sdk Commit: bb09c07b6351dcc53c0bdc8bf1259261ad2edfba Parents: 3454d69 35e2fdc Author: Robert Bradshaw Authored: Wed Dec 21 15:15:20 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 21 15:15:20 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 45 .../apache_beam/internal/apiclient_test.py | 6 +++ sdks/python/apache_beam/utils/options.py| 33 ++ .../utils/pipeline_options_validator.py | 11 ++--- .../utils/pipeline_options_validator_test.py| 8 ++-- 5 files changed, 54 insertions(+), 49 deletions(-) --
[1/2] incubator-beam git commit: Update Apitools to version 0.5.6
Repository: incubator-beam Updated Branches: refs/heads/python-sdk e26527873 -> 3b4fd5c7d Update Apitools to version 0.5.6 This brings in the fix to https://github.com/google/apitools/pull/136 needed for the BigQuery reader. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63074312 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63074312 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63074312 Branch: refs/heads/python-sdk Commit: 63074312aeb44b5db7f4e914c64864483f6a6510 Parents: e265278 Author: Sourabh Bajaj Authored: Sat Dec 3 09:02:32 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 15 17:11:58 2016 -0800 -- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63074312/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 033afc7..f6357b6 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -86,7 +86,7 @@ else: REQUIRED_PACKAGES = [ 'avro>=1.7.7,<2.0.0', 'dill>=0.2.5,<0.3', -'google-apitools>=0.5.2,<1.0.0', +'google-apitools>=0.5.6,<1.0.0', 'googledatastore==6.4.1', 'httplib2>=0.8,<0.10', 'mock>=1.0.1,<3.0.0',
[2/2] incubator-beam git commit: Closes #1501
Closes #1501 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b4fd5c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b4fd5c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b4fd5c7 Branch: refs/heads/python-sdk Commit: 3b4fd5c7d962987405dc157e6b84788af61f6413 Parents: e265278 6307431 Author: Robert Bradshaw Authored: Thu Dec 15 17:12:55 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 15 17:12:55 2016 -0800 -- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Rename PTransform.apply() to PTransform.expand()
44 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -318,7 +318,7 @@ class CombineTest(unittest.TestCase): def test_combine_globally_with_default_side_input(self): class CombineWithSideInput(PTransform): - def apply(self, pcoll): + def expand(self, pcoll): side = pcoll | CombineGlobally(sum).as_singleton_view() main = pcoll.pipeline | Create([None]) return main | Map(lambda _, s: s, side) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/core.py -- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 523c5a6..0ba1c62 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -598,7 +598,7 @@ class ParDo(PTransformWithSideInputs): label='Transform Function'), 'fn_dd': self.fn} - def apply(self, pcoll): + def expand(self, pcoll): self.side_output_tags = set() # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. self.dofn = self.fn @@ -641,7 +641,7 @@ class _MultiParDo(PTransform): self._tags = tags self._main_tag = main_tag - def apply(self, pcoll): + def expand(self, pcoll): _ = pcoll | self._do_transform return pvalue.DoOutputsTuple( pcoll.pipeline, self._do_transform, self._tags, self._main_tag) @@ -854,7 +854,7 @@ class CombineGlobally(PTransform): def as_singleton_view(self): return self.clone(as_view=True) - def apply(self, pcoll): + def expand(self, pcoll): def add_input_types(transform): type_hints = self.get_type_hints() if type_hints.input_types: @@ -939,7 +939,7 @@ class CombinePerKey(PTransformWithSideInputs): def process_argspec_fn(self): return self.fn._fn # pylint: disable=protected-access - def apply(self, pcoll): + def expand(self, pcoll): args, kwargs = util.insert_values_in_args( self.args, self.kwargs, self.side_inputs) return pcoll | GroupByKey() | CombineValues('Combine', @@ -952,7 +952,7 @@ class CombineValues(PTransformWithSideInputs): def make_fn(self, fn): return fn if isinstance(fn, CombineFn) else CombineFn.from_callable(fn) - def apply(self, pcoll): + def expand(self, pcoll): args, kwargs = util.insert_values_in_args( self.args, self.kwargs, self.side_inputs) @@ -1083,7 +1083,7 @@ class GroupByKey(PTransform): timer_window, name, time_domain, fire_time, state): yield wvalue.with_value((k, wvalue.value)) - def apply(self, pcoll): + def expand(self, pcoll): # This code path is only used in the local direct runner. For Dataflow # runner execution, the GroupByKey transform is expanded on the service. input_type = pcoll.element_type @@ -1132,7 +1132,7 @@ class GroupByKeyOnly(PTransform): key_type, value_type = trivial_inference.key_value_types(input_type) return KV[key_type, Iterable[value_type]] - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PCollection(pcoll.pipeline) @@ -1170,7 +1170,7 @@ class Partition(PTransformWithSideInputs): def make_fn(self, fn): return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn) - def apply(self, pcoll): + def expand(self, pcoll): n = int(self.args[0]) return pcoll | ParDo( self.ApplyPartitionFnFn(), self.fn, *self.args, @@ -1261,14 +1261,14 @@ class WindowInto(ParDo): def infer_output_type(self, input_type): return input_type - def apply(self, pcoll): + def expand(self, pcoll): input_type = pcoll.element_type if input_type is not None: output_type = input_type self.with_input_types(input_type) self.with_output_types(output_type) -return super(WindowInto, self).apply(pcoll) +return super(WindowInto, self).expand(pcoll) # Python's pickling is broken for nested classes. @@ -1305,7 +1305,7 @@ class Flatten(PTransform): raise ValueError('Input to Flatten must be an iterable.') return pvalueish, pvalueish - def apply(self, pcolls): + def expand(self, pcolls): for pcoll in pcolls: self._check_pcollection(pcoll) return pvalue.PCollection(self.pipeline) @@ -1345,7 +1345,7 @@ class Create(PTransform): else: return Union[[trivial_inference.instance_to_type(v) for v in self.value]] - def apply(self, pbegin): + def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline return pvalue.PCollection(self.pipeline) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/ptransform.py -
[2/2] incubator-beam git commit: Closes #1634
Closes #1634 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2652787 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2652787 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2652787 Branch: refs/heads/python-sdk Commit: e2652787355d4c322138f55ae2c54494ec592e59 Parents: d3c8874 e62249a Author: Robert Bradshaw Authored: Thu Dec 15 16:52:39 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 15 16:52:39 2016 -0800 -- sdks/python/README.md | 2 +- .../examples/complete/autocomplete.py | 2 +- .../examples/complete/estimate_pi.py| 2 +- .../apache_beam/examples/complete/tfidf.py | 2 +- .../examples/complete/top_wikipedia_sessions.py | 6 ++--- .../examples/cookbook/custom_ptransform.py | 2 +- .../examples/cookbook/multiple_output_pardo.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 16 ++--- .../examples/snippets/snippets_test.py | 2 +- .../apache_beam/examples/wordcount_debugging.py | 2 +- sdks/python/apache_beam/io/avroio.py| 4 ++-- .../apache_beam/io/datastore/v1/datastoreio.py | 4 ++-- sdks/python/apache_beam/io/iobase.py| 6 ++--- sdks/python/apache_beam/io/textio.py| 4 ++-- sdks/python/apache_beam/pipeline_test.py| 4 ++-- .../runners/dataflow/native_io/iobase.py| 2 +- .../apache_beam/runners/direct/direct_runner.py | 2 +- sdks/python/apache_beam/runners/runner.py | 4 ++-- sdks/python/apache_beam/transforms/combiners.py | 14 ++-- .../apache_beam/transforms/combiners_test.py| 2 +- sdks/python/apache_beam/transforms/core.py | 24 ++-- .../python/apache_beam/transforms/ptransform.py | 10 .../apache_beam/transforms/ptransform_test.py | 6 ++--- .../python/apache_beam/transforms/sideinputs.py | 10 sdks/python/apache_beam/transforms/util.py | 4 ++-- .../transforms/write_ptransform_test.py | 2 +- .../typehints/typed_pipeline_test.py| 2 +- 27 files changed, 71 insertions(+), 71 deletions(-) --
[2/2] incubator-beam git commit: Closes #1617
Closes #1617 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d3c88748 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d3c88748 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d3c88748 Branch: refs/heads/python-sdk Commit: d3c88748099fcccb27aef67c5c390d0bc67ebeb0 Parents: e383c77 0a558c7 Author: Robert Bradshaw Authored: Thu Dec 15 16:35:59 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 15 16:35:59 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Update the BQ export flat from Json to Avro
Repository: incubator-beam Updated Branches: refs/heads/python-sdk e383c7715 -> d3c887480 Update the BQ export flat from Json to Avro Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a558c71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a558c71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a558c71 Branch: refs/heads/python-sdk Commit: 0a558c7171d6e4452d88ecffd16a024a19cbfc42 Parents: e383c77 Author: Sourabh Bajaj Authored: Wed Dec 14 11:44:46 2016 -0800 Committer: Sourabh Bajaj Committed: Wed Dec 14 11:44:46 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a558c71/sdks/python/apache_beam/runners/dataflow_runner.py -- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 8b953b0..a3f7d94 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -520,7 +520,7 @@ class DataflowPipelineRunner(PipelineRunner): elif transform.source.format == 'text': step.add_property(PropertyNames.FILE_PATTERN, transform.source.path) elif transform.source.format == 'bigquery': - step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_JSON') + step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO') # TODO(silviuc): Add table validation if transform.source.validate. if transform.source.table_reference is not None: step.add_property(PropertyNames.BIGQUERY_DATASET,
[2/2] incubator-beam git commit: Closes #1591
Closes #1591 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e383c771 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e383c771 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e383c771 Branch: refs/heads/python-sdk Commit: e383c77151bcfb61e3394c5dd040a425aa246bec Parents: f086afe e36ee8d Author: Robert Bradshaw Authored: Tue Dec 13 11:09:13 2016 -0800 Committer: Robert Bradshaw Committed: Tue Dec 13 11:09:13 2016 -0800 -- sdks/python/apache_beam/runners/direct/transform_evaluator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Do not test pickling native sink objects
Repository: incubator-beam Updated Branches: refs/heads/python-sdk f086afe12 -> e383c7715 Do not test pickling native sink objects Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e36ee8d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e36ee8d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e36ee8d7 Branch: refs/heads/python-sdk Commit: e36ee8d797965e2e2101cce046a14399f2008fc6 Parents: f086afe Author: Ahmet Altay Authored: Mon Dec 12 19:09:37 2016 -0800 Committer: Ahmet Altay Committed: Mon Dec 12 19:09:37 2016 -0800 -- sdks/python/apache_beam/runners/direct/transform_evaluator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e36ee8d7/sdks/python/apache_beam/runners/direct/transform_evaluator.py -- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 7a9a31f..24ab754 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -505,8 +505,7 @@ class _NativeWriteEvaluator(_TransformEvaluator): side_inputs) assert applied_ptransform.transform.sink -# TODO(aaltay): Consider storing the serialized form as an optimization. -self._sink = pickler.loads(pickler.dumps(applied_ptransform.transform.sink)) +self._sink = applied_ptransform.transform.sink @property def _is_final_bundle(self):
[1/2] incubator-beam git commit: [BEAM-1124] Temporarily Ignore a ValidatesRunnerTest That Broke Postcommit
Repository: incubator-beam Updated Branches: refs/heads/python-sdk b265dceaa -> f086afe12 [BEAM-1124] Temporarily Ignore a ValidatesRunnerTest That Broke Postcommit Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a70e581 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a70e581 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a70e581 Branch: refs/heads/python-sdk Commit: 0a70e581bef4258dfb18f3c5db8f9a9369ab13e8 Parents: b265dce Author: Mark Liu Authored: Fri Dec 9 16:55:45 2016 -0800 Committer: Robert Bradshaw Committed: Mon Dec 12 14:19:57 2016 -0800 -- sdks/python/apache_beam/dataflow_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a70e581/sdks/python/apache_beam/dataflow_test.py -- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py index ba3553a..f410230 100644 --- a/sdks/python/apache_beam/dataflow_test.py +++ b/sdks/python/apache_beam/dataflow_test.py @@ -176,7 +176,9 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to([(1, 'empty'), (2, 'empty')])) pipeline.run() - @attr('ValidatesRunner') + # @attr('ValidatesRunner') + # TODO(BEAM-1124): Temporarily disable it due to test failed running on + # Dataflow service. def test_multi_valued_singleton_side_input(self): pipeline = TestPipeline() pcol = pipeline | 'start' >> Create([1, 2])
[2/2] incubator-beam git commit: Closes #1571
Closes #1571 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f086afe1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f086afe1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f086afe1 Branch: refs/heads/python-sdk Commit: f086afe125cc413e7626ab186268d5bb3c067a89 Parents: b265dce 0a70e58 Author: Robert Bradshaw Authored: Mon Dec 12 14:19:58 2016 -0800 Committer: Robert Bradshaw Committed: Mon Dec 12 14:19:58 2016 -0800 -- sdks/python/apache_beam/dataflow_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Add more documentation to datastore_wordcount example
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 8eae855d6 -> b265dceaa Add more documentation to datastore_wordcount example Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/62b8095e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/62b8095e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/62b8095e Branch: refs/heads/python-sdk Commit: 62b8095e7164a316b8ae93c7fefa41d38ee255a8 Parents: 8eae855 Author: Vikas Kedigehalli Authored: Wed Dec 7 14:14:41 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 9 11:29:02 2016 -0800 -- .../examples/cookbook/datastore_wordcount.py| 46 +++- 1 file changed, 44 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/62b8095e/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py -- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index eb62614..9613402 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -15,7 +15,49 @@ # limitations under the License. # -"""A word-counting workflow that uses Google Cloud Datastore.""" +"""A word-counting workflow that uses Google Cloud Datastore. + +This example shows how to use ``datastoreio`` to read from and write to +Google Cloud Datastore. Note that running this example may incur charge for +Cloud Datastore operations. + +See https://developers.google.com/datastore/ for more details on Google Cloud +Datastore. +See http://beam.incubator.apache.org/get-started/quickstart on +how to run a Beam pipeline. + +Read-only Mode: In this mode, this example reads Cloud Datastore entities using +the ``datastoreio.ReadFromDatastore`` transform, extracts the words, +counts them and write the output to a set of files. + +The following options must be provided to run this pipeline in read-only mode: +`` +--project YOUR_PROJECT_ID +--kind YOUR_DATASTORE_KIND +--output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH] +--read-only +`` + +Read-write Mode: In this mode, this example reads words from an input file, +converts them to Cloud Datastore ``Entity`` objects and writes them to +Cloud Datastore using the ``datastoreio.Write`` transform. The second pipeline +will then read these Cloud Datastore entities using the +``datastoreio.ReadFromDatastore`` transform, extract the words, count them and +write the output to a set of files. + +The following options must be provided to run this pipeline in read-write mode: +`` +--project YOUR_PROJECT_ID +--kind YOUR_DATASTORE_KIND +--output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH] +`` + +Note: We are using the Cloud Datastore protobuf objects directly because +that is the interface that the ``datastoreio`` exposes. +See the following links on more information about these protobuf messages. +https://cloud.google.com/datastore/docs/reference/rpc/google.datastore.v1 and +https://github.com/googleapis/googleapis/tree/master/google/datastore/v1 +""" from __future__ import absolute_import @@ -196,7 +238,7 @@ def run(argv=None): if not known_args.read_only: write_to_datastore(gcloud_options.project, known_args, pipeline_options) - # Read from Datastore. + # Read entities from Datastore. result = read_from_datastore(gcloud_options.project, known_args, pipeline_options)
[2/2] incubator-beam git commit: Closes #1540
Closes #1540 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b265dcea Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b265dcea Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b265dcea Branch: refs/heads/python-sdk Commit: b265dceaac93a50543151efb4c3168a8275e8b2d Parents: 8eae855 62b8095 Author: Robert Bradshaw Authored: Fri Dec 9 11:29:03 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 9 11:29:03 2016 -0800 -- .../examples/cookbook/datastore_wordcount.py| 46 +++- 1 file changed, 44 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Closes #1551
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 44c1586f3 -> 8eae855d6 Closes #1551 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8eae855d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8eae855d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8eae855d Branch: refs/heads/python-sdk Commit: 8eae855d608abb140c0e3ece3927765575b81cbb Parents: 44c1586 2dee686 Author: Robert Bradshaw Authored: Fri Dec 9 11:28:02 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 9 11:28:02 2016 -0800 -- sdks/python/run_postcommit.sh | 5 + 1 file changed, 1 insertion(+), 4 deletions(-) --
[2/2] incubator-beam git commit: [BEAM-1109] Fix Python Postcommit Test Timeout
[BEAM-1109] Fix Python Postcommit Test Timeout Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2dee6868 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2dee6868 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2dee6868 Branch: refs/heads/python-sdk Commit: 2dee68683ad6d18161d22cd71f8d90768d7fcb30 Parents: 44c1586 Author: Mark Liu Authored: Thu Dec 8 00:06:06 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 9 11:28:02 2016 -0800 -- sdks/python/run_postcommit.sh | 5 + 1 file changed, 1 insertion(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2dee6868/sdks/python/run_postcommit.sh -- diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh index 4da078f..2e00a03 100755 --- a/sdks/python/run_postcommit.sh +++ b/sdks/python/run_postcommit.sh @@ -71,11 +71,8 @@ python setup.py sdist SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz) # Run ValidatesRunner tests on Google Cloud Dataflow service -# processes -> number of processes to run tests in parallel -# process-timeout -> test timeout in seconds python setup.py nosetests \ - -a ValidatesRunner --processes=4 --process-timeout=360 \ - --test-pipeline-options=" \ + -a ValidatesRunner --test-pipeline-options=" \ --runner=BlockingDataflowPipelineRunner \ --project=$PROJECT \ --staging_location=$GCS_LOCATION/staging-validatesrunner-test \
[2/2] incubator-beam git commit: Fix a typo in query split error handling
Fix a typo in query split error handling Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6afb906 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6afb906 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6afb906 Branch: refs/heads/python-sdk Commit: d6afb90690f5be2f3cb38d68dc1d49a1b551118e Parents: 1392f70 Author: Vikas Kedigehalli Authored: Wed Dec 7 14:19:26 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:22:31 2016 -0800 -- .../apache_beam/io/datastore/v1/datastoreio.py | 2 +- .../io/datastore/v1/datastoreio_test.py | 29 2 files changed, 30 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py index fc3e813..a86bb0b 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py @@ -181,7 +181,7 @@ class ReadFromDatastore(PTransform): except Exception: logging.warning("Unable to parallelize the given query: %s", query, exc_info=True) -query_splits = [(key, query)] +query_splits = [query] sharded_query_splits = [] for split_query in query_splits: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py index 2ac7ffb..f80a320 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py @@ -122,6 +122,35 @@ class DatastoreioTest(unittest.TestCase): self.assertEqual(1, len(returned_split_queries)) self.assertEqual(0, len(self._mock_datastore.method_calls)) + def test_SplitQueryFn_with_exception(self): +"""A test that verifies that no split is performed when failures occur.""" +with patch.object(helper, 'get_datastore', + return_value=self._mock_datastore): + # Force SplitQueryFn to compute the number of query splits + num_splits = 0 + expected_num_splits = 1 + entity_bytes = (expected_num_splits * + ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES) + with patch.object(ReadFromDatastore, 'get_estimated_size_bytes', +return_value=entity_bytes): + +with patch.object(query_splitter, 'get_splits', + side_effect=ValueError("Testing query split error")): + split_query_fn = ReadFromDatastore.SplitQueryFn( + self._PROJECT, self._query, None, num_splits) + mock_context = MagicMock() + mock_context.element = self._query + split_query_fn.start_bundle(mock_context) + returned_split_queries = [] + for split_query in split_query_fn.process(mock_context): +returned_split_queries.append(split_query) + + self.assertEqual(len(returned_split_queries), expected_num_splits) + self.assertEqual(returned_split_queries[0][1], self._query) + self.assertEqual(0, + len(self._mock_datastore.run_query.call_args_list)) + self.verify_unique_keys(returned_split_queries) + def test_DatastoreWriteFn_with_emtpy_batch(self): self.check_DatastoreWriteFn(0)
[1/2] incubator-beam git commit: Closes #1542
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 1392f70b6 -> 44c1586f3 Closes #1542 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44c1586f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44c1586f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44c1586f Branch: refs/heads/python-sdk Commit: 44c1586f3815f957a43037309f2a46a1766c6516 Parents: 1392f70 d6afb90 Author: Robert Bradshaw Authored: Thu Dec 8 11:22:31 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:22:31 2016 -0800 -- .../apache_beam/io/datastore/v1/datastoreio.py | 2 +- .../io/datastore/v1/datastoreio_test.py | 29 2 files changed, 30 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Handle empty batches in GcsIO batch methods
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 75be6e974 -> 1392f70b6 Handle empty batches in GcsIO batch methods Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ef83b33 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ef83b33 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ef83b33 Branch: refs/heads/python-sdk Commit: 3ef83b3396a4574b3283b29ba1f878b31badd612 Parents: 75be6e9 Author: Charles Chen Authored: Wed Dec 7 15:03:01 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:18:51 2016 -0800 -- sdks/python/apache_beam/io/gcsio.py | 4 sdks/python/apache_beam/io/gcsio_test.py | 4 2 files changed, 8 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ef83b33/sdks/python/apache_beam/io/gcsio.py -- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 748465f..f150c4c 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -204,6 +204,8 @@ class GcsIO(object): argument, where exception is None if the operation succeeded or the relevant exception if the operation failed. """ +if not paths: + return [] batch_request = BatchApiRequest( retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES) for path in paths: @@ -264,6 +266,8 @@ class GcsIO(object): src_dest_pairs argument, where exception is None if the operation succeeded or the relevant exception if the operation failed. """ +if not src_dest_pairs: + return [] batch_request = BatchApiRequest( retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES) for src, dest in src_dest_pairs: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ef83b33/sdks/python/apache_beam/io/gcsio_test.py -- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index 5af13c6..bd7eb51 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -265,6 +265,10 @@ class TestGCSIO(unittest.TestCase): with self.assertRaises(ValueError): self.gcs.open(file_name, 'r+b') + def test_empty_batches(self): +self.assertEqual([], self.gcs.copy_batch([])) +self.assertEqual([], self.gcs.delete_batch([])) + def test_delete(self): file_name = 'gs://gcsio-test/delete_me' file_size = 1024
[2/2] incubator-beam git commit: Closes #1544
Closes #1544 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1392f70b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1392f70b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1392f70b Branch: refs/heads/python-sdk Commit: 1392f70b6ee2db76e333fad16efe0cfdd00e7175 Parents: 75be6e9 3ef83b3 Author: Robert Bradshaw Authored: Thu Dec 8 11:18:52 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:18:52 2016 -0800 -- sdks/python/apache_beam/io/gcsio.py | 4 sdks/python/apache_beam/io/gcsio_test.py | 4 2 files changed, 8 insertions(+) --
[1/2] incubator-beam git commit: Add reference to the >> and | operators for pipelines.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 43057960a -> 75be6e974 Add reference to the >> and | operators for pipelines. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3510ff99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3510ff99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3510ff99 Branch: refs/heads/python-sdk Commit: 3510ff99da9cd149e67e8fdb12b942689374b2d7 Parents: 4305796 Author: Maria Garcia Herrero Authored: Tue Dec 6 21:05:47 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:14:51 2016 -0800 -- sdks/python/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3510ff99/sdks/python/README.md -- diff --git a/sdks/python/README.md b/sdks/python/README.md index cff497c..820084d 100644 --- a/sdks/python/README.md +++ b/sdks/python/README.md @@ -163,8 +163,9 @@ The following examples demonstrate some basic, fundamental concepts for using Ap A basic pipeline will take as input an iterable, apply the beam.Create `PTransform`, and produce a `PCollection` that can -be written to a file or modified by further `PTransform`s. The -pipe operator allows to chain `PTransform`s. +be written to a file or modified by further `PTransform`s. +The `>>` operator is used to label `PTransform`s and +the `|` operator is used to chain them. ```python # Standard imports
[2/2] incubator-beam git commit: Closes #1521
Closes #1521 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75be6e97 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75be6e97 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75be6e97 Branch: refs/heads/python-sdk Commit: 75be6e974831f73ea935fe1f52fd7091a03c8928 Parents: 4305796 3510ff9 Author: Robert Bradshaw Authored: Thu Dec 8 11:14:52 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:14:52 2016 -0800 -- sdks/python/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Fix template_runner_test on Windows
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 4a660c604 -> 43057960a Fix template_runner_test on Windows Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a565ca10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a565ca10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a565ca10 Branch: refs/heads/python-sdk Commit: a565ca1008309564dba41d551c68ea553cd83a7b Parents: 4a660c6 Author: Charles Chen Authored: Wed Dec 7 16:09:49 2016 -0800 Committer: Charles Chen Committed: Wed Dec 7 16:09:49 2016 -0800 -- .../apache_beam/runners/template_runner_test.py | 14 ++ 1 file changed, 10 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a565ca10/sdks/python/apache_beam/runners/template_runner_test.py -- diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py index a141521..cc3d7c2 100644 --- a/sdks/python/apache_beam/runners/template_runner_test.py +++ b/sdks/python/apache_beam/runners/template_runner_test.py @@ -33,24 +33,30 @@ from apache_beam.internal import apiclient class TemplatingDataflowPipelineRunnerTest(unittest.TestCase): """TemplatingDataflow tests.""" def test_full_completion(self): -dummy_file = tempfile.NamedTemporaryFile() +# Create dummy file and close it. Note that we need to do this because +# Windows does not allow NamedTemporaryFiles to be reopened elsewhere +# before the temporary file is closed. +dummy_file = tempfile.NamedTemporaryFile(delete=False) +dummy_file_name = dummy_file.name +dummy_file.close() + dummy_dir = tempfile.mkdtemp() remote_runner = DataflowPipelineRunner() pipeline = Pipeline(remote_runner, options=PipelineOptions([ '--dataflow_endpoint=ignored', -'--sdk_location=' + dummy_file.name, +'--sdk_location=' + dummy_file_name, '--job_name=test-job', '--project=test-project', '--staging_location=' + dummy_dir, '--temp_location=/dev/null', -'--template_location=' + dummy_file.name, +'--template_location=' + dummy_file_name, '--no_auth=True'])) pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned pipeline.run() -with open(dummy_file.name) as template_file: +with open(dummy_file_name) as template_file: saved_job_dict = json.load(template_file) self.assertEqual( saved_job_dict['environment']['sdkPipelineOptions']
[2/2] incubator-beam git commit: Closes #1548
Closes #1548 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/43057960 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/43057960 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/43057960 Branch: refs/heads/python-sdk Commit: 43057960af906ff5c435ea016d5f6df5bccaea40 Parents: 4a660c6 a565ca1 Author: Robert Bradshaw Authored: Wed Dec 7 18:17:39 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 7 18:17:39 2016 -0800 -- .../apache_beam/runners/template_runner_test.py | 14 ++ 1 file changed, 10 insertions(+), 4 deletions(-) --
[2/2] incubator-beam git commit: [BEAM-1077] @ValidatesRunner Test in Python Postcommit
[BEAM-1077] @ValidatesRunner Test in Python Postcommit Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c7626ad2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c7626ad2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c7626ad2 Branch: refs/heads/python-sdk Commit: c7626ad2f616402dc23809f628d515bcd50f3247 Parents: d5e8c79 Author: Mark Liu Authored: Fri Dec 2 13:58:39 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 7 12:14:38 2016 -0800 -- sdks/python/run_postcommit.sh | 36 ++-- 1 file changed, 26 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7626ad2/sdks/python/run_postcommit.sh -- diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh index 2cd40da..4da078f 100755 --- a/sdks/python/run_postcommit.sh +++ b/sdks/python/run_postcommit.sh @@ -54,29 +54,45 @@ echo ">>> RUNNING DIRECT RUNNER py-wordcount" python -m apache_beam.examples.wordcount --output /tmp/py-wordcount-direct # TODO: check that output file is generated for Direct Runner. -# Run wordcount on the service. +# Run tests on the service. # Where to store wordcount output. GCS_LOCATION=gs://temp-storage-for-end-to-end-tests # Job name needs to be unique -JOBNAME=py-wordcount-`date +%s` +JOBNAME_E2E=py-wordcount-`date +%s` +JOBNAME_VR_TEST=py-validatesrunner-`date +%s` PROJECT=apache-beam-testing # Create a tarball python setup.py sdist +SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz) + +# Run ValidatesRunner tests on Google Cloud Dataflow service +# processes -> number of processes to run tests in parallel +# process-timeout -> test timeout in seconds +python setup.py nosetests \ + -a ValidatesRunner --processes=4 --process-timeout=360 \ + --test-pipeline-options=" \ +--runner=BlockingDataflowPipelineRunner \ +--project=$PROJECT \ +--staging_location=$GCS_LOCATION/staging-validatesrunner-test \ +--sdk_location=$SDK_LOCATION \ +--job_name=$JOBNAME_VR_TEST \ +--num_workers=1" + # Run wordcount on the Google Cloud Dataflow service python -m apache_beam.examples.wordcount \ ---output $GCS_LOCATION/py-wordcount-cloud \ ---staging_location $GCS_LOCATION/staging-wordcount \ ---temp_location $GCS_LOCATION/temp-wordcount \ ---runner BlockingDataflowPipelineRunner \ ---job_name $JOBNAME \ ---project $PROJECT \ ---sdk_location dist/apache-beam-sdk-*.tar.gz \ ---num_workers 1 >> job_output 2>&1 || true; + --output $GCS_LOCATION/py-wordcount-cloud \ + --staging_location $GCS_LOCATION/staging-wordcount \ + --temp_location $GCS_LOCATION/temp-wordcount \ + --runner BlockingDataflowPipelineRunner \ + --job_name $JOBNAME_E2E \ + --project $PROJECT \ + --sdk_location $SDK_LOCATION \ + --num_workers 1 >> job_output 2>&1 || true; # Print full job output, validate correct, then remove it. echo ">>> JOB OUTPUT FOLLOWS"
[1/2] incubator-beam git commit: Closes #1492
Repository: incubator-beam Updated Branches: refs/heads/python-sdk d5e8c79a3 -> 4a660c604 Closes #1492 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a660c60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a660c60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a660c60 Branch: refs/heads/python-sdk Commit: 4a660c604a0b21f685d87b8ecc008aeb13bb4049 Parents: d5e8c79 c7626ad Author: Robert Bradshaw Authored: Wed Dec 7 12:14:38 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 7 12:14:38 2016 -0800 -- sdks/python/run_postcommit.sh | 36 ++-- 1 file changed, 26 insertions(+), 10 deletions(-) --
[2/2] incubator-beam git commit: Closes #1485
Closes #1485 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f19f767b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f19f767b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f19f767b Branch: refs/heads/python-sdk Commit: f19f767b09275bdea325bb37a3767d96eeacd4a0 Parents: 6dcc429 aef4858 Author: Robert Bradshaw Authored: Tue Dec 6 10:14:11 2016 -0800 Committer: Robert Bradshaw Committed: Tue Dec 6 10:14:11 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 19 +-- sdks/python/apache_beam/internal/pickler.py | 8 2 files changed, 25 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Fix the pickle issue with the inconsistency of dill load and dump session
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 6dcc429e5 -> f19f767b0 Fix the pickle issue with the inconsistency of dill load and dump session Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aef4858b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aef4858b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aef4858b Branch: refs/heads/python-sdk Commit: aef4858b80dfacf3401e6672b9373c82a8e77027 Parents: 6dcc429 Author: Sourabh Bajaj Authored: Fri Dec 2 15:02:18 2016 -0800 Committer: Robert Bradshaw Committed: Tue Dec 6 10:14:10 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 19 +-- sdks/python/apache_beam/internal/pickler.py | 8 2 files changed, 25 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aef4858b/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index c5f5f70..f1341a7 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -46,7 +46,6 @@ from apache_beam.utils.options import WorkerOptions from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow - BIGQUERY_API_SERVICE = 'bigquery.googleapis.com' COMPUTE_API_SERVICE = 'compute.googleapis.com' STORAGE_API_SERVICE = 'storage.googleapis.com' @@ -55,13 +54,19 @@ STORAGE_API_SERVICE = 'storage.googleapis.com' class Step(object): """Wrapper for a dataflow Step protobuf.""" - def __init__(self, step_kind, step_name): + def __init__(self, step_kind, step_name, additional_properties=None): self.step_kind = step_kind self.step_name = step_name self.proto = dataflow.Step(kind=step_kind, name=step_name) self.proto.properties = {} +self._additional_properties = [] + +if additional_properties is not None: + for (n, v, t) in additional_properties: +self.add_property(n, v, t) def add_property(self, name, value, with_type=False): +self._additional_properties.append((name, value, with_type)) self.proto.properties.additionalProperties.append( dataflow.Step.PropertiesValue.AdditionalProperty( key=name, value=to_json_value(value, with_type=with_type))) @@ -77,6 +82,11 @@ class Step(object): outputs.append(entry_prop.value.string_value) return outputs + def __reduce__(self): +"""Reduce hook for pickling the Step class more easily +""" +return (Step, (self.step_kind, self.step_name, self._additional_properties)) + def get_output(self, tag=None): """Returns name if it is one of the outputs or first output if name is None. @@ -330,6 +340,11 @@ class Job(object): def json(self): return encoding.MessageToJson(self.proto) + def __reduce__(self): +"""Reduce hook for pickling the Job class more easily +""" +return (Job, (self.options,)) + class DataflowApplicationClient(object): """A Dataflow API client used by application code to create and query jobs.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aef4858b/sdks/python/apache_beam/internal/pickler.py -- diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 30f0b77..d39a497 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -204,6 +204,14 @@ def loads(encoded): def dump_session(file_path): + """Pickle the current python session to be used in the worker. + + Note: Due to the inconsistency in the first dump of dill dump_session we + create and load the dump twice to have consistent results in the worker and + the running session. Check: https://github.com/uqfoundation/dill/issues/195 + """ + dill.dump_session(file_path) + dill.load_session(file_path) return dill.dump_session(file_path)
[2/2] incubator-beam git commit: Closes #1512
Closes #1512 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6dcc429e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6dcc429e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6dcc429e Branch: refs/heads/python-sdk Commit: 6dcc429e5bf4bb37605773be6de31efa3f887093 Parents: e73bdb5 4a02a68 Author: Robert Bradshaw Authored: Tue Dec 6 10:10:45 2016 -0800 Committer: Robert Bradshaw Committed: Tue Dec 6 10:10:45 2016 -0800 -- .../apache_beam/runners/template_runner_test.py | 82 +++ sdks/python/apache_beam/template_runner_test.py | 83 2 files changed, 82 insertions(+), 83 deletions(-) --
[1/2] incubator-beam git commit: Move template_runners_test to runners folder.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk e73bdb5c2 -> 6dcc429e5 Move template_runners_test to runners folder. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a02a688 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a02a688 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a02a688 Branch: refs/heads/python-sdk Commit: 4a02a688dbacd640e40d7b3d3ca268fde9806e73 Parents: e73bdb5 Author: Ahmet Altay Authored: Mon Dec 5 14:57:14 2016 -0800 Committer: Robert Bradshaw Committed: Tue Dec 6 10:10:44 2016 -0800 -- .../apache_beam/runners/template_runner_test.py | 82 +++ sdks/python/apache_beam/template_runner_test.py | 83 2 files changed, 82 insertions(+), 83 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a02a688/sdks/python/apache_beam/runners/template_runner_test.py -- diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py new file mode 100644 index 000..a141521 --- /dev/null +++ b/sdks/python/apache_beam/runners/template_runner_test.py @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for templated pipelines.""" + +from __future__ import absolute_import + +import json +import unittest +import tempfile + +import apache_beam as beam +from apache_beam.pipeline import Pipeline +from apache_beam.runners.dataflow_runner import DataflowPipelineRunner +from apache_beam.utils.options import PipelineOptions +from apache_beam.internal import apiclient + + +class TemplatingDataflowPipelineRunnerTest(unittest.TestCase): + """TemplatingDataflow tests.""" + def test_full_completion(self): +dummy_file = tempfile.NamedTemporaryFile() +dummy_dir = tempfile.mkdtemp() + +remote_runner = DataflowPipelineRunner() +pipeline = Pipeline(remote_runner, +options=PipelineOptions([ +'--dataflow_endpoint=ignored', +'--sdk_location=' + dummy_file.name, +'--job_name=test-job', +'--project=test-project', +'--staging_location=' + dummy_dir, +'--temp_location=/dev/null', +'--template_location=' + dummy_file.name, +'--no_auth=True'])) + +pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned +pipeline.run() +with open(dummy_file.name) as template_file: + saved_job_dict = json.load(template_file) + self.assertEqual( + saved_job_dict['environment']['sdkPipelineOptions'] + ['options']['project'], 'test-project') + self.assertEqual( + saved_job_dict['environment']['sdkPipelineOptions'] + ['options']['job_name'], 'test-job') + + def test_bad_path(self): +dummy_sdk_file = tempfile.NamedTemporaryFile() +remote_runner = DataflowPipelineRunner() +pipeline = Pipeline(remote_runner, +options=PipelineOptions([ +'--dataflow_endpoint=ignored', +'--sdk_location=' + dummy_sdk_file.name, +'--job_name=test-job', +'--project=test-project', +'--staging_location=ignored', +'--temp_location=/dev/null', +'--template_location=/bad/path', +'--no_auth=True'])) +remote_runner.job = apiclient.Job(pipeline.options) + +with self.assertRaises(IOError): + pipeline.run() + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a02a688/sdks/python/apache_beam/template_runner_test.py -
[1/2] incubator-beam git commit: Change export format to AVRO for BQ
Repository: incubator-beam Updated Branches: refs/heads/python-sdk d59bccd82 -> eb98d636e Change export format to AVRO for BQ Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/390fbfdd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/390fbfdd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/390fbfdd Branch: refs/heads/python-sdk Commit: 390fbfddf444df410f7cb61329496f6f24a0532c Parents: d59bccd Author: Sourabh Bajaj Authored: Mon Dec 5 14:04:54 2016 -0800 Committer: Sourabh Bajaj Committed: Mon Dec 5 14:04:54 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/390fbfdd/sdks/python/apache_beam/runners/dataflow_runner.py -- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 8b953b0..a3f7d94 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -520,7 +520,7 @@ class DataflowPipelineRunner(PipelineRunner): elif transform.source.format == 'text': step.add_property(PropertyNames.FILE_PATTERN, transform.source.path) elif transform.source.format == 'bigquery': - step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_JSON') + step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO') # TODO(silviuc): Add table validation if transform.source.validate. if transform.source.table_reference is not None: step.add_property(PropertyNames.BIGQUERY_DATASET,
[2/2] incubator-beam git commit: Closes #1510
Closes #1510 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eb98d636 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eb98d636 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eb98d636 Branch: refs/heads/python-sdk Commit: eb98d636ec9bbe11795aa9ee2fea1a8ceddaf794 Parents: d59bccd 390fbfd Author: Robert Bradshaw Authored: Mon Dec 5 17:10:34 2016 -0800 Committer: Robert Bradshaw Committed: Mon Dec 5 17:10:34 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[2/2] incubator-beam git commit: Closes #1509
Closes #1509 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d59bccd8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d59bccd8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d59bccd8 Branch: refs/heads/python-sdk Commit: d59bccd82461d340613a16ab41db2a4cc6e4200b Parents: f7118c8 f1b83f7 Author: Robert Bradshaw Authored: Mon Dec 5 13:01:19 2016 -0800 Committer: Robert Bradshaw Committed: Mon Dec 5 13:01:19 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Add missing job parameter to the submit_job_description.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk f7118c8a5 -> d59bccd82 Add missing job parameter to the submit_job_description. Tested post commit test locally. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f1b83f7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f1b83f7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f1b83f7e Branch: refs/heads/python-sdk Commit: f1b83f7e82b56cd36bffbcdd5cc8ab319bf1e9d3 Parents: f7118c8 Author: Ahmet Altay Authored: Mon Dec 5 12:29:45 2016 -0800 Committer: Ahmet Altay Committed: Mon Dec 5 12:29:45 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f1b83f7e/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index a894557..c5f5f70 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -412,7 +412,7 @@ class DataflowApplicationClient(object): self.stage_file(gcs_or_local_path, file_name, StringIO(job.json())) if not template_location: - return self.submit_job_description() + return self.submit_job_description(job) else: return None @@ -426,7 +426,7 @@ class DataflowApplicationClient(object): # TODO(silviuc): Remove the debug logging eventually. logging.info('JOB: %s', job) - def submit_job_description(self): + def submit_job_description(self, job): """Creates and excutes a job request.""" request = dataflow.DataflowProjectsJobsCreateRequest() request.projectId = self.google_cloud_options.project
[1/2] incubator-beam git commit: Modify create_job to allow staging the job and not submitting it to the service.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 7c5e4aa66 -> 0d99856f3 Modify create_job to allow staging the job and not submitting it to the service. - Modularize create_job in create job description, stage job, and send for execution. - Modify --dataflow_job_file to stage the job and continue submitting it to the service. - Add --template_location to stage the job but not submit it to the service. - Add tests for both, including making them mutually exclusive (following Java SDK decision). - Add template_runner_test.py with integration tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cfa0ad81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cfa0ad81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cfa0ad81 Branch: refs/heads/python-sdk Commit: cfa0ad8136b323bade9de14ea6149e7f74cbd0b4 Parents: 7c5e4aa Author: Maria Garcia Herrero Authored: Wed Nov 2 09:14:48 2016 -0700 Committer: Robert Bradshaw Committed: Mon Dec 5 11:04:34 2016 -0800 -- sdks/python/apache_beam/examples/wordcount.py | 1 - sdks/python/apache_beam/internal/apiclient.py | 34 +++- .../apache_beam/runners/dataflow_runner.py | 13 ++- sdks/python/apache_beam/template_runner_test.py | 83 sdks/python/apache_beam/utils/options.py| 10 +++ .../apache_beam/utils/pipeline_options_test.py | 13 +++ .../utils/pipeline_options_validator_test.py| 28 +++ 7 files changed, 175 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/examples/wordcount.py -- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 096e508..7f347d8 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -59,7 +59,6 @@ class WordExtractingDoFn(beam.DoFn): def run(argv=None): """Main entry point; defines and runs the wordcount pipeline.""" - parser = argparse.ArgumentParser() parser.add_argument('--input', dest='input', http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 5612631..a894557 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -24,6 +24,7 @@ import os import re import time +from StringIO import StringIO from apitools.base.py import encoding from apitools.base.py import exceptions @@ -42,7 +43,6 @@ from apache_beam.utils.options import DebugOptions from apache_beam.utils.options import GoogleCloudOptions from apache_beam.utils.options import StandardOptions from apache_beam.utils.options import WorkerOptions - from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow @@ -327,6 +327,9 @@ class Job(object): self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$') self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$') + def json(self): +return encoding.MessageToJson(self.proto) + class DataflowApplicationClient(object): """A Dataflow API client used by application code to create and query jobs.""" @@ -392,8 +395,29 @@ class DataflowApplicationClient(object): # TODO(silviuc): Refactor so that retry logic can be applied. @retry.no_retries # Using no_retries marks this as an integration point. def create_job(self, job): -"""Submits for remote execution a job described by the workflow proto.""" -# Stage job resources and add an environment proto with their paths. +"""Creates a job description. +Additionally, it may stage it and/or submit it for remote execution. +""" +self.create_job_description(job) + +# Stage and submit the job when necessary +dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file +template_location = ( +job.options.view_as(GoogleCloudOptions).template_location) + +job_location = template_location or dataflow_job_file +if job_location: + gcs_or_local_path = os.path.dirname(job_location) + file_name = os.path.basename(job_location) + self.stage_file(gcs_or_local_path, file_name, StringIO(job.json())) + +if not template_location: + return self.submit_job_description() +else: + return None + + def create_job_description(self, job): +"""Creates a job described by the workflow proto.""" resources = dependency
[2/2] incubator-beam git commit: Closes #1342
Closes #1342 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0d99856f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0d99856f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0d99856f Branch: refs/heads/python-sdk Commit: 0d99856f37d6bca9bb8d676ae36157bd0515a4f2 Parents: 7c5e4aa cfa0ad8 Author: Robert Bradshaw Authored: Mon Dec 5 11:04:35 2016 -0800 Committer: Robert Bradshaw Committed: Mon Dec 5 11:04:35 2016 -0800 -- sdks/python/apache_beam/examples/wordcount.py | 1 - sdks/python/apache_beam/internal/apiclient.py | 34 +++- .../apache_beam/runners/dataflow_runner.py | 13 ++- sdks/python/apache_beam/template_runner_test.py | 83 sdks/python/apache_beam/utils/options.py| 10 +++ .../apache_beam/utils/pipeline_options_test.py | 13 +++ .../utils/pipeline_options_validator_test.py| 28 +++ 7 files changed, 175 insertions(+), 7 deletions(-) --
[2/2] incubator-beam git commit: Removing a bug in .travis.yml that makes the build fail.
Removing a bug in .travis.yml that makes the build fail. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90db7908 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90db7908 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90db7908 Branch: refs/heads/python-sdk Commit: 90db7908b807cb752c23c445b220b3d5dd08b36b Parents: 9a175a5 Author: Pablo Authored: Tue Nov 29 13:58:55 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:15:41 2016 -0800 -- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90db7908/.travis.yml -- diff --git a/.travis.yml b/.travis.yml index 3080341..470d2fc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,7 +59,7 @@ before_install: install: - if [ ! "$TEST_PYTHON" ]; then travis_retry mvn -B install clean -U -DskipTests=true; fi - if [ "$TEST_PYTHON" ] && pip list | grep tox; then TOX_FILE=`which tox` ; export TOX_HOME=`dirname $TOX_FILE`; fi - - if [ "$TEST_PYTHON" ] && ! pip list | grep tox; then travis_retry pip install tox --user `whoami`; fi + - if [ "$TEST_PYTHON" ] && ! pip list | grep tox; then travis_retry pip install tox --user; fi # Removing this here protects from inadvertent caching - rm -rf "$HOME/.m2/repository/org/apache/beam"
[1/2] incubator-beam git commit: Closes #1456
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 9a175a5fe -> 7c5e4aa66 Closes #1456 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c5e4aa6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c5e4aa6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c5e4aa6 Branch: refs/heads/python-sdk Commit: 7c5e4aa66c3916b98cf7ecf932f11c2b057e1858 Parents: 9a175a5 90db790 Author: Robert Bradshaw Authored: Fri Dec 2 22:15:41 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:15:41 2016 -0800 -- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Call from_p12_keyfile() with the correct arguments.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 7c0bf25fa -> 9a175a5fe Call from_p12_keyfile() with the correct arguments. This code path is failing, because a wrong list of arguments is passed. Fixing that uncovered that oauth2client depends on pyOpenSSL for this call to work. I did not add this dependency to setup.py because, it does not install cleanly in all environments. As a workaround, users who would like to use this authentication method could first do a 'pip install pyOpenSSL'. I added a test, that skips if 'pyOpenSSL' is not installed. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f23b717e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f23b717e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f23b717e Branch: refs/heads/python-sdk Commit: f23b717e7e433c30c0acee0fea8d179e6343b8b8 Parents: 7c0bf25 Author: Ahmet Altay Authored: Fri Dec 2 13:38:31 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:14:32 2016 -0800 -- sdks/python/apache_beam/internal/auth.py| 6 +-- sdks/python/apache_beam/internal/auth_test.py | 44 +++ sdks/python/apache_beam/tests/data/README.md| 20 + .../apache_beam/tests/data/privatekey.p12 | Bin 0 -> 2452 bytes sdks/python/setup.py| 2 +- 5 files changed, 68 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f23b717e/sdks/python/apache_beam/internal/auth.py -- diff --git a/sdks/python/apache_beam/internal/auth.py b/sdks/python/apache_beam/internal/auth.py index a043fcf..056f40c 100644 --- a/sdks/python/apache_beam/internal/auth.py +++ b/sdks/python/apache_beam/internal/auth.py @@ -133,6 +133,7 @@ def get_service_credentials(): 'https://www.googleapis.com/auth/datastore' ] +# TODO(BEAM-1068): Do not recreate options from sys.argv. # We are currently being run from the command line. google_cloud_options = PipelineOptions( sys.argv).view_as(GoogleCloudOptions) @@ -151,8 +152,8 @@ def get_service_credentials(): return ServiceAccountCredentials.from_p12_keyfile( google_cloud_options.service_account_name, google_cloud_options.service_account_key_file, -client_scopes, -user_agent=user_agent) +private_key_password=None, +scopes=client_scopes) except ImportError: with file(google_cloud_options.service_account_key_file) as f: service_account_key = f.read() @@ -162,7 +163,6 @@ def get_service_credentials(): service_account_key, client_scopes, user_agent=user_agent) - else: try: credentials = _GCloudWrapperCredentials(user_agent) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f23b717e/sdks/python/apache_beam/internal/auth_test.py -- diff --git a/sdks/python/apache_beam/internal/auth_test.py b/sdks/python/apache_beam/internal/auth_test.py new file mode 100644 index 000..dfd408e --- /dev/null +++ b/sdks/python/apache_beam/internal/auth_test.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Unit tests for the auth module.""" + +import os +import sys +import unittest + +import mock + +from apache_beam.internal import auth + + +class AuthTest(unittest.TestCase): + + def test_create_application_client(self): +try: + test_args = [ + 'test', '--service_account_name', 'abc', '--service_account_key_file', + os.path.join( + os.path.dirname(__file__), '..', 'tests/data/privatekey.p12')] + with mock.patch.object(sys, 'argv', test_args): +credentials = auth.get_service_credentials() +self.assertIsNotNone(credentials) +except NotImplementedError: + self.skipTest('service account tests require pyOpe
[2/2] incubator-beam git commit: Closes #1491
Closes #1491 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a175a5f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a175a5f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a175a5f Branch: refs/heads/python-sdk Commit: 9a175a5fe17d087f2c3ca0ff8e6d53faad6beab4 Parents: 7c0bf25 f23b717 Author: Robert Bradshaw Authored: Fri Dec 2 22:14:33 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:14:33 2016 -0800 -- sdks/python/apache_beam/internal/auth.py| 6 +-- sdks/python/apache_beam/internal/auth_test.py | 44 +++ sdks/python/apache_beam/tests/data/README.md| 20 + .../apache_beam/tests/data/privatekey.p12 | Bin 0 -> 2452 bytes sdks/python/setup.py| 2 +- 5 files changed, 68 insertions(+), 4 deletions(-) --
[1/2] incubator-beam git commit: Add labels to lambdas in write finalization
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 72fa21f98 -> 7c0bf25fa Add labels to lambdas in write finalization Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51e97d4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51e97d4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51e97d4a Branch: refs/heads/python-sdk Commit: 51e97d4a8d3f25608b6ee80f57c973186798d54f Parents: 72fa21f Author: Charles Chen Authored: Fri Dec 2 15:17:55 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:12:35 2016 -0800 -- sdks/python/apache_beam/io/iobase.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51e97d4a/sdks/python/apache_beam/io/iobase.py -- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index b7cac3e..fd6ae57 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -767,10 +767,10 @@ class WriteImpl(ptransform.PTransform): write_result_coll = (pcoll | core.ParDo('write_bundles', _WriteBundleDoFn(), self.sink, AsSingleton(init_result_coll)) - | core.Map(lambda x: (None, x)) + | core.Map('pair', lambda x: (None, x)) | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | core.FlatMap(lambda x: x[1])) + | core.FlatMap('extract', lambda x: x[1])) return do_once | core.FlatMap( 'finalize_write', _finalize_write,
[2/2] incubator-beam git commit: Closes #1496
Closes #1496 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c0bf25f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c0bf25f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c0bf25f Branch: refs/heads/python-sdk Commit: 7c0bf25fadbe2e74ab62c87c90111b8b7c34e297 Parents: 72fa21f 51e97d4 Author: Robert Bradshaw Authored: Fri Dec 2 22:12:36 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:12:36 2016 -0800 -- sdks/python/apache_beam/io/iobase.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Make the legacy SQL flag consistent between Java and Python
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 8365b6838 -> 72fa21f98 Make the legacy SQL flag consistent between Java and Python Renamed the BigQuery use_legacy_sql parameter to use_standard_sql. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/72721031 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/72721031 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/72721031 Branch: refs/heads/python-sdk Commit: 727210318404a585bb7742591ade0a09ccc20444 Parents: 8365b68 Author: Sourabh Bajaj Authored: Fri Dec 2 16:45:19 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:10:21 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/io/bigquery.py| 9 + sdks/python/apache_beam/io/bigquery_test.py | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72721031/sdks/python/apache_beam/examples/snippets/snippets.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 580a3d7..6dcf05e 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -962,7 +962,7 @@ def model_bigqueryio(): 'ReadYearAndTemp', beam.io.BigQuerySource( query='SELECT year, mean_temp FROM `samples.weather_stations`', - use_legacy_sql=False)) + use_standard_sql=True)) # [END model_bigqueryio_query_standard_sql] # [START model_bigqueryio_schema] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72721031/sdks/python/apache_beam/io/bigquery.py -- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 0885e3a..ce75e10 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -323,7 +323,7 @@ class BigQuerySource(dataflow_io.NativeSource): """A source based on a BigQuery table.""" def __init__(self, table=None, dataset=None, project=None, query=None, - validate=False, coder=None, use_legacy_sql=True): + validate=False, coder=None, use_standard_sql=False): """Initialize a BigQuerySource. Args: @@ -351,8 +351,8 @@ class BigQuerySource(dataflow_io.NativeSource): in a file as a JSON serialized dictionary. This argument needs a value only in special cases when returning table rows as dictionaries is not desirable. - useLegacySql: Specifies whether to use BigQuery's legacy -SQL dialect for this query. The default value is true. If set to false, + use_standard_sql: Specifies whether to use BigQuery's standard +SQL dialect for this query. The default value is False. If set to True, the query will use BigQuery's updated SQL dialect with improved standards compliance. This parameter is ignored for table inputs. @@ -374,7 +374,8 @@ class BigQuerySource(dataflow_io.NativeSource): self.use_legacy_sql = True else: self.query = query - self.use_legacy_sql = use_legacy_sql + # TODO(BEAM-1082): Change the internal flag to be standard_sql + self.use_legacy_sql = not use_standard_sql self.table_reference = None self.validate = validate http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72721031/sdks/python/apache_beam/io/bigquery_test.py -- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index e263e13..a2cf947 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -199,7 +199,7 @@ class TestBigQuerySource(unittest.TestCase): hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_specify_query_sql_format(self): -source = beam.io.BigQuerySource(query='my_query', use_legacy_sql=False) +source = beam.io.BigQuerySource(query='my_query', use_standard_sql=True) self.assertEqual(source.query, 'my_query') self.assertFalse(source.use_legacy_sql) @@ -371,7 +371,7 @@ class TestBigQueryReader(unittest.TestCase): jobComplete=True, rows=table_rows, schema=schema) actual_rows = [] with beam.io.BigQuerySource( -query='query', use_legacy_sql=False).reader(client) as reader: +query='query', use_standard_sql=True).reader(client) as reader: for row in reader: actual_rows.append(row)
[2/2] incubator-beam git commit: Closes #1497
Closes #1497 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/72fa21f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/72fa21f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/72fa21f9 Branch: refs/heads/python-sdk Commit: 72fa21f98527e57cd5c7fad3977c95d7c994325e Parents: 8365b68 7272103 Author: Robert Bradshaw Authored: Fri Dec 2 22:10:22 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:10:22 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/io/bigquery.py| 9 + sdks/python/apache_beam/io/bigquery_test.py | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) --
[2/2] incubator-beam git commit: Closes #1494
Closes #1494 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8365b683 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8365b683 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8365b683 Branch: refs/heads/python-sdk Commit: 8365b6838eda6dcef39097ab19b85b9af270914f Parents: fd6a52c 16ffdb2 Author: Robert Bradshaw Authored: Fri Dec 2 22:06:42 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:06:42 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/internal/apiclient_test.py | 1 + sdks/python/apache_beam/io/datastore/v1/datastoreio.py | 10 +++--- sdks/python/apache_beam/io/datastore/v1/helper.py | 4 +++- 4 files changed, 8 insertions(+), 9 deletions(-) --
[1/2] incubator-beam git commit: Fix auth related unit test failures
Repository: incubator-beam Updated Branches: refs/heads/python-sdk fd6a52c15 -> 8365b6838 Fix auth related unit test failures Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/16ffdb25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/16ffdb25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/16ffdb25 Branch: refs/heads/python-sdk Commit: 16ffdb25f6029c4bee71f035d8d9747f6330ec9f Parents: fd6a52c Author: Vikas Kedigehalli Authored: Fri Dec 2 14:13:31 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:06:41 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/internal/apiclient_test.py | 1 + sdks/python/apache_beam/io/datastore/v1/datastoreio.py | 10 +++--- sdks/python/apache_beam/io/datastore/v1/helper.py | 4 +++- 4 files changed, 8 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/examples/snippets/snippets.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index c2a047f..580a3d7 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -919,7 +919,7 @@ def model_datastoreio(): # [START model_datastoreio_write] p = beam.Pipeline(options=PipelineOptions()) musicians = p | 'Musicians' >> beam.Create( - ['Mozart', 'Chopin', 'Beethoven', 'Bach']) + ['Mozart', 'Chopin', 'Beethoven', 'Vivaldi']) def to_entity(content): entity = entity_pb2.Entity() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/internal/apiclient_test.py -- diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py index 66cc8db..31b2dad 100644 --- a/sdks/python/apache_beam/internal/apiclient_test.py +++ b/sdks/python/apache_beam/internal/apiclient_test.py @@ -25,6 +25,7 @@ from apache_beam.internal import apiclient class UtilTest(unittest.TestCase): + @unittest.skip("Enable once BEAM-1080 is fixed.") def test_create_application_client(self): pipeline_options = PipelineOptions() apiclient.DataflowApplicationClient( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/io/datastore/v1/datastoreio.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py index 054002f..20466b9 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py @@ -22,7 +22,6 @@ import logging from google.datastore.v1 import datastore_pb2 from googledatastore import helper as datastore_helper -from apache_beam.internal import auth from apache_beam.io.datastore.v1 import helper from apache_beam.io.datastore.v1 import query_splitter from apache_beam.transforms import Create @@ -154,8 +153,7 @@ class ReadFromDatastore(PTransform): self._num_splits = num_splits def start_bundle(self, context): - self._datastore = helper.get_datastore(self._project, - auth.get_service_credentials()) + self._datastore = helper.get_datastore(self._project) def process(self, p_context, *args, **kwargs): # distinct key to be used to group query splits. @@ -210,8 +208,7 @@ class ReadFromDatastore(PTransform): self._datastore = None def start_bundle(self, context): - self._datastore = helper.get_datastore(self._project, - auth.get_service_credentials()) + self._datastore = helper.get_datastore(self._project) def process(self, p_context, *args, **kwargs): query = p_context.element @@ -341,8 +338,7 @@ class _Mutate(PTransform): def start_bundle(self, context): self._mutations = [] - self._datastore = helper.get_datastore(self._project, - auth.get_service_credentials()) + self._datastore = helper.get_datastore(self._project) def process(self, context): self._mutations.append(context.element) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/io/datastore/v1/helper.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/helper.py b/sdks/python/apache_beam/io
[2/2] incubator-beam git commit: Closes #1481
Closes #1481 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2363ee51 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2363ee51 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2363ee51 Branch: refs/heads/python-sdk Commit: 2363ee510694dab20d925f7d4979fc6dcd495477 Parents: a463f00 557a2f9 Author: Robert Bradshaw Authored: Fri Dec 2 11:34:29 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 11:34:29 2016 -0800 -- .../apache_beam/examples/snippets/snippets.py | 41 .../examples/snippets/snippets_test.py | 9 - 2 files changed, 49 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Add snippet for datastoreio
Repository: incubator-beam Updated Branches: refs/heads/python-sdk a463f000e -> 2363ee510 Add snippet for datastoreio Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/557a2f92 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/557a2f92 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/557a2f92 Branch: refs/heads/python-sdk Commit: 557a2f92f67bfc545533fa35852485e9c4c0b785 Parents: a463f00 Author: Vikas Kedigehalli Authored: Thu Dec 1 10:27:05 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 11:34:28 2016 -0800 -- .../apache_beam/examples/snippets/snippets.py | 41 .../examples/snippets/snippets_test.py | 9 - 2 files changed, 49 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/557a2f92/sdks/python/apache_beam/examples/snippets/snippets.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 8d05130..c2a047f 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -891,6 +891,47 @@ def model_textio(renames): p.run() +def model_datastoreio(): + """Using a Read and Write transform to read/write to Cloud Datastore. + + URL: https://cloud.google.com/dataflow/model/datastoreio + """ + + import uuid + from google.datastore.v1 import entity_pb2 + from google.datastore.v1 import query_pb2 + import googledatastore + import apache_beam as beam + from apache_beam.utils.options import PipelineOptions + from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore + from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore + + project = 'my_project' + kind = 'my_kind' + query = query_pb2.Query() + query.kind.add().name = kind + + # [START model_datastoreio_read] + p = beam.Pipeline(options=PipelineOptions()) + entities = p | 'Read From Datastore' >> ReadFromDatastore(project, query) + # [END model_datastoreio_read] + + # [START model_datastoreio_write] + p = beam.Pipeline(options=PipelineOptions()) + musicians = p | 'Musicians' >> beam.Create( + ['Mozart', 'Chopin', 'Beethoven', 'Bach']) + + def to_entity(content): +entity = entity_pb2.Entity() +googledatastore.helper.add_key_path(entity.key, kind, str(uuid.uuid4())) +googledatastore.helper.add_properties(entity, {'content': unicode(content)}) +return entity + + entities = musicians | 'To Entity' >> beam.Map(to_entity) + entities | 'Write To Datastore' >> WriteToDatastore(project) + # [END model_datastoreio_write] + + def model_bigqueryio(): """Using a Read and Write transform to read/write to BigQuery. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/557a2f92/sdks/python/apache_beam/examples/snippets/snippets_test.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 72fccb2..09b4ba4 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -470,11 +470,18 @@ class SnippetsTest(unittest.TestCase): ['aa', 'bb', 'bb', 'cc', 'cc', 'cc'], self.get_output(result_path, suffix='.csv')) + def test_model_datastoreio(self): +# We cannot test datastoreio functionality in unit tests therefore we limit +# ourselves to making sure the pipeline containing Datastore read and write +# transforms can be built. +# TODO(vikasrk): Expore using Datastore Emulator. +snippets.model_datastoreio() + def test_model_bigqueryio(self): # We cannot test BigQueryIO functionality in unit tests therefore we limit # ourselves to making sure the pipeline containing BigQuery sources and # sinks can be built. -self.assertEqual(None, snippets.model_bigqueryio()) +snippets.model_bigqueryio() def _run_test_pipeline_for_options(self, fn): temp_path = self.create_temp_file('aa\nbb\ncc')
[1/2] incubator-beam git commit: auth: add application default credentials as fallback
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 9ded359da -> a463f000e auth: add application default credentials as fallback Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/01bddf29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/01bddf29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/01bddf29 Branch: refs/heads/python-sdk Commit: 01bddf296dfb84a70bc733add6a76c76cf6afaef Parents: 9ded359 Author: Vikas Kedigehalli Authored: Wed Nov 30 17:55:20 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 11:29:41 2016 -0800 -- sdks/python/apache_beam/internal/auth.py| 37 +++- .../apache_beam/io/datastore/v1/datastoreio.py | 10 -- .../apache_beam/io/datastore/v1/helper.py | 6 ++-- 3 files changed, 37 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01bddf29/sdks/python/apache_beam/internal/auth.py -- diff --git a/sdks/python/apache_beam/internal/auth.py b/sdks/python/apache_beam/internal/auth.py index f324a2d..a043fcf 100644 --- a/sdks/python/apache_beam/internal/auth.py +++ b/sdks/python/apache_beam/internal/auth.py @@ -24,7 +24,7 @@ import os import sys import urllib2 - +from oauth2client.client import GoogleCredentials from oauth2client.client import OAuth2Credentials from apache_beam.utils import processes @@ -125,6 +125,14 @@ def get_service_credentials(): # them again. return GCEMetadataCredentials(user_agent=user_agent) else: +client_scopes = [ +'https://www.googleapis.com/auth/bigquery', +'https://www.googleapis.com/auth/cloud-platform', +'https://www.googleapis.com/auth/devstorage.full_control', +'https://www.googleapis.com/auth/userinfo.email', +'https://www.googleapis.com/auth/datastore' +] + # We are currently being run from the command line. google_cloud_options = PipelineOptions( sys.argv).view_as(GoogleCloudOptions) @@ -135,13 +143,6 @@ def get_service_credentials(): if not os.path.exists(google_cloud_options.service_account_key_file): raise AuthenticationException( 'Specified service account key file does not exist.') - client_scopes = [ - 'https://www.googleapis.com/auth/bigquery', - 'https://www.googleapis.com/auth/cloud-platform', - 'https://www.googleapis.com/auth/devstorage.full_control', - 'https://www.googleapis.com/auth/userinfo.email', - 'https://www.googleapis.com/auth/datastore' - ] # The following code uses oauth2client >=2.0.0 functionality and if this # is not available due to import errors will use 1.5.2 functionality. @@ -163,4 +164,22 @@ def get_service_credentials(): user_agent=user_agent) else: - return _GCloudWrapperCredentials(user_agent) + try: +credentials = _GCloudWrapperCredentials(user_agent) +# Check if we are able to get an access token. If not fallback to +# application default credentials. +credentials.get_access_token() +return credentials + except AuthenticationException: +logging.warning('Unable to find credentials from gcloud.') + + # Falling back to application default credentials. + try: +credentials = GoogleCredentials.get_application_default() +credentials = credentials.create_scoped(client_scopes) +logging.debug('Connecting using Google Application Default ' + 'Credentials.') +return credentials + except Exception: +logging.warning('Unable to find default credentials to use.') +raise http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01bddf29/sdks/python/apache_beam/io/datastore/v1/datastoreio.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py index 20466b9..054002f 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py @@ -22,6 +22,7 @@ import logging from google.datastore.v1 import datastore_pb2 from googledatastore import helper as datastore_helper +from apache_beam.internal import auth from apache_beam.io.datastore.v1 import helper from apache_beam.io.datastore.v1 import query_splitter from apache_beam.transforms import Create @@ -153,7 +154,8 @@ class ReadFromDatastore(PTransform): self._num_splits = num_splits def start_bundle(self, context): - self._datastore = helper.get_datastore(self._project) + self._data
[2/2] incubator-beam git commit: Closes #1476
Closes #1476 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a463f000 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a463f000 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a463f000 Branch: refs/heads/python-sdk Commit: a463f000e5bc953aab99735ecdb01b4a2bcda828 Parents: 9ded359 01bddf2 Author: Robert Bradshaw Authored: Fri Dec 2 11:29:42 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 11:29:42 2016 -0800 -- sdks/python/apache_beam/internal/auth.py| 37 +++- .../apache_beam/io/datastore/v1/datastoreio.py | 10 -- .../apache_beam/io/datastore/v1/helper.py | 6 ++-- 3 files changed, 37 insertions(+), 16 deletions(-) --
[2/2] incubator-beam git commit: Closes #1472
Closes #1472 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ded359d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ded359d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ded359d Branch: refs/heads/python-sdk Commit: 9ded359daefc6040d61a1f33c77563474fcb09b6 Parents: 4414e20 dc68365 Author: Robert Bradshaw Authored: Thu Dec 1 09:40:32 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 1 09:40:32 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 9 + 1 file changed, 9 insertions(+) --
[1/2] incubator-beam git commit: Add snippet for standard sql
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 4414e20a6 -> 9ded359da Add snippet for standard sql Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc68365f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc68365f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc68365f Branch: refs/heads/python-sdk Commit: dc68365f01b4e33ac459abbe2c6fd5567f7b5bc7 Parents: 4414e20 Author: Sourabh Bajaj Authored: Wed Nov 30 14:59:15 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 1 09:40:31 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc68365f/sdks/python/apache_beam/examples/snippets/snippets.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 891f464..8d05130 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -915,6 +915,15 @@ def model_bigqueryio(): query='SELECT year, mean_temp FROM samples.weather_stations')) # [END model_bigqueryio_query] + # [START model_bigqueryio_query_standard_sql] + p = beam.Pipeline(options=PipelineOptions()) + weather_data = p | beam.io.Read( + 'ReadYearAndTemp', + beam.io.BigQuerySource( + query='SELECT year, mean_temp FROM `samples.weather_stations`', + use_legacy_sql=False)) + # [END model_bigqueryio_query_standard_sql] + # [START model_bigqueryio_schema] schema = 'source:STRING, quote:STRING' # [END model_bigqueryio_schema]
[2/2] incubator-beam git commit: Improve size estimation speed for file samples
Improve size estimation speed for file samples Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9caaea0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9caaea0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9caaea0f Branch: refs/heads/python-sdk Commit: 9caaea0f15131ecc64c56ce579361094edc50ae5 Parents: 739a431 Author: Sourabh Bajaj Authored: Wed Nov 30 12:18:04 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 1 09:10:06 2016 -0800 -- sdks/python/apache_beam/io/filebasedsource.py | 13 ++--- 1 file changed, 6 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9caaea0f/sdks/python/apache_beam/io/filebasedsource.py -- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 14eaf27..14c2b06 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -188,11 +188,12 @@ class FileBasedSource(iobase.BoundedSource): def estimate_size(self): file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] +# We're reading very few files so we can pass names file names to +# _estimate_sizes_of_files without pattern as otherwise we'll try to do +# optimization based on the pattern and might end up reading much more +# data than needed for a few files. if (len(file_names) <= FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT): - # We're reading very few files so we can pass names without pattern - # as otherwise we'll try to do optimization based on the pattern and - # might end up reading much more data than needed for a few files. return sum(self._estimate_sizes_of_files(file_names)) else: # Estimating size of a random sample. @@ -202,10 +203,8 @@ class FileBasedSource(iobase.BoundedSource): int(len(file_names) * FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT)) sample = random.sample(file_names, sample_size) - estimate = self._estimate_sizes_of_files(sample, self._pattern) - return int( - sum(estimate) * - (float(len(file_names)) / len(sample))) + estimate = self._estimate_sizes_of_files(sample) + return int(sum(estimate) * (float(len(file_names)) / len(sample))) def read(self, range_tracker): return self._get_concat_source().read(range_tracker)
[1/2] incubator-beam git commit: Closes #1467
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 739a43197 -> 4414e20a6 Closes #1467 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4414e20a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4414e20a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4414e20a Branch: refs/heads/python-sdk Commit: 4414e20a67fb902be798c477757563c819bdfece Parents: 739a431 9caaea0 Author: Robert Bradshaw Authored: Thu Dec 1 09:10:06 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 1 09:10:06 2016 -0800 -- sdks/python/apache_beam/io/filebasedsource.py | 13 ++--- 1 file changed, 6 insertions(+), 7 deletions(-) --
[1/2] incubator-beam git commit: Parse table schema from JSON
Repository: incubator-beam Updated Branches: refs/heads/python-sdk aa9071d56 -> 739a43197 Parse table schema from JSON Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4c2f62b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4c2f62b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4c2f62b Branch: refs/heads/python-sdk Commit: b4c2f62be8a809b666089e7b2fe5dada9cbd6c16 Parents: aa9071d Author: Sourabh Bajaj Authored: Wed Nov 30 13:48:28 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 1 09:07:27 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 37 sdks/python/apache_beam/io/bigquery_test.py | 22 ++ 2 files changed, 59 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4c2f62b/sdks/python/apache_beam/io/bigquery.py -- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 8d7892a..0885e3a 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -200,6 +200,43 @@ class TableRowJsonCoder(coders.Coder): f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()]) +def parse_table_schema_from_json(schema_string): + """Parse the Table Schema provided as string. + + Args: +schema_string: String serialized table schema, should be a valid JSON. + + Returns: +A TableSchema of the BigQuery export from either the Query or the Table. + """ + json_schema = json.loads(schema_string) + + def _parse_schema_field(field): +"""Parse a single schema field from dictionary. + +Args: + field: Dictionary object containing serialized schema. + +Returns: + A TableFieldSchema for a single column in BigQuery. +""" +schema = bigquery.TableFieldSchema() +schema.name = field['name'] +schema.type = field['type'] +if 'mode' in field: + schema.mode = field['mode'] +else: + schema.mode = 'NULLABLE' +if 'description' in field: + schema.description = field['description'] +if 'fields' in field: + schema.fields = [_parse_schema_field(x) for x in field['fields']] +return schema + + fields = [_parse_schema_field(f) for f in json_schema['fields']] + return bigquery.TableSchema(fields=fields) + + class BigQueryDisposition(object): """Class holding standard strings used for create and write dispositions.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4c2f62b/sdks/python/apache_beam/io/bigquery_test.py -- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index b0c3bbe..e263e13 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -32,6 +32,7 @@ from apache_beam.internal.clients import bigquery from apache_beam.internal.json_value import to_json_value from apache_beam.io.bigquery import RowAsDictJsonCoder from apache_beam.io.bigquery import TableRowJsonCoder +from apache_beam.io.bigquery import parse_table_schema_from_json from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.utils.options import PipelineOptions @@ -113,6 +114,27 @@ class TestTableRowJsonCoder(unittest.TestCase): self.json_compliance_exception(float('-inf')) +class TestTableSchemaParser(unittest.TestCase): + def test_parse_table_schema_from_json(self): +string_field = bigquery.TableFieldSchema( +name='s', type='STRING', mode='NULLABLE', description='s description') +number_field = bigquery.TableFieldSchema( +name='n', type='INTEGER', mode='REQUIRED', description='n description') +record_field = bigquery.TableFieldSchema( +name='r', type='RECORD', mode='REQUIRED', description='r description', +fields=[string_field, number_field]) +expected_schema = bigquery.TableSchema(fields=[record_field]) +json_str = json.dumps({'fields': [ +{'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED', + 'description': 'r description', 'fields': [ + {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE', + 'description': 's description'}, + {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED', + 'description': 'n description'}]}]}) +self.assertEqual(parse_table_schema_from_json(json_str), + expected_schema) + + class TestBigQuerySource(unittest.TestCase): def test_display_data_item_on_validate_true(self):
[2/2] incubator-beam git commit: Closes #1468
Closes #1468 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/739a4319 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/739a4319 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/739a4319 Branch: refs/heads/python-sdk Commit: 739a431975120fe267e6b81635ce6e2356bd2895 Parents: aa9071d b4c2f62 Author: Robert Bradshaw Authored: Thu Dec 1 09:07:28 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 1 09:07:28 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 37 sdks/python/apache_beam/io/bigquery_test.py | 22 ++ 2 files changed, 59 insertions(+) --
[2/2] incubator-beam git commit: Closes #1454
Closes #1454 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aa9071d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aa9071d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aa9071d5 Branch: refs/heads/python-sdk Commit: aa9071d56120a1c0f91cc580fb956570113ee104 Parents: 70c1de9 495a2d8 Author: Robert Bradshaw Authored: Wed Nov 30 14:08:58 2016 -0800 Committer: Robert Bradshaw Committed: Wed Nov 30 14:08:58 2016 -0800 -- .../examples/cookbook/datastore_wordcount.py| 210 +++ .../apache_beam/examples/datastore_wordcount.py | 203 -- sdks/python/setup.py| 2 +- 3 files changed, 211 insertions(+), 204 deletions(-) --
[1/2] incubator-beam git commit: Few datastoreio fixes
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 70c1de9b9 -> aa9071d56 Few datastoreio fixes - pin googledatastore version to 6.4.1 - add num_shards options to datastore wordcount example - move datastore wordcount example to cookbook directory Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/495a2d8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/495a2d8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/495a2d8c Branch: refs/heads/python-sdk Commit: 495a2d8c01949e4248d7e6dc9ad7a04168a292d1 Parents: 70c1de9 Author: Vikas Kedigehalli Authored: Tue Nov 29 10:02:14 2016 -0800 Committer: Vikas Kedigehalli Committed: Wed Nov 30 12:18:53 2016 -0800 -- .../examples/cookbook/datastore_wordcount.py| 210 +++ .../apache_beam/examples/datastore_wordcount.py | 203 -- sdks/python/setup.py| 2 +- 3 files changed, 211 insertions(+), 204 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/495a2d8c/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py -- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py new file mode 100644 index 000..eb62614 --- /dev/null +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -0,0 +1,210 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A word-counting workflow that uses Google Cloud Datastore.""" + +from __future__ import absolute_import + +import argparse +import logging +import re +import uuid + +from google.datastore.v1 import entity_pb2 +from google.datastore.v1 import query_pb2 +from googledatastore import helper as datastore_helper, PropertyFilter + +import apache_beam as beam +from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore +from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore +from apache_beam.utils.options import GoogleCloudOptions +from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.options import SetupOptions + +empty_line_aggregator = beam.Aggregator('emptyLines') +average_word_size_aggregator = beam.Aggregator('averageWordLength', + beam.combiners.MeanCombineFn(), + float) + + +class WordExtractingDoFn(beam.DoFn): + """Parse each line of input text into words.""" + + def process(self, context): +"""Returns an iterator over words in contents of Cloud Datastore entity. +The element is a line of text. If the line is blank, note that, too. +Args: + context: the call-specific context: data and aggregator. +Returns: + The processed element. +""" +content_value = context.element.properties.get('content', None) +text_line = '' +if content_value: + text_line = content_value.string_value + +if not text_line: + context.aggregate_to(empty_line_aggregator, 1) +words = re.findall(r'[A-Za-z\']+', text_line) +for w in words: + context.aggregate_to(average_word_size_aggregator, len(w)) +return words + + +class EntityWrapper(object): + """Create a Cloud Datastore entity from the given string.""" + def __init__(self, namespace, kind, ancestor): +self._namespace = namespace +self._kind = kind +self._ancestor = ancestor + + def make_entity(self, content): +entity = entity_pb2.Entity() +if self._namespace is not None: + entity.key.partition_id.namespace_id = self._namespace + +# All entities created will have the same ancestor +datastore_helper.add_key_path(entity.key, self._kind, self._ancestor, + self._kind, str(uuid.uuid4())) + +datastore_helper.add_properties(entity, {"content": unicode(content)}) +return entity + + +def write_to_datastore(project, user_options, pipeline_
[1/2] incubator-beam git commit: Improve GcsIO throughput by 10x
Repository: incubator-beam Updated Branches: refs/heads/python-sdk cce4331dc -> c1440f7aa Improve GcsIO throughput by 10x This change increases the read buffer used from 1M to 16M. Previously, the speed of reading an incompressible file were: (50 MB: 3.17 MB/s, 100 MB: 3.79 MB/s, 200 MB: 4.13 MB/s, 400 MB: 4.24 MB/s). The speed is now improved to: (50 MB: 24.21 MB/s, 100 MB: 42.70 MB/s, 200 MB: 42.89 MB/s, 400 MB: 46.92 MB/s). Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4a332d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4a332d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4a332d9 Branch: refs/heads/python-sdk Commit: e4a332d9de5eca941e08f23242cd63bb83148312 Parents: cce4331 Author: Charles Chen Authored: Thu Nov 17 11:46:44 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 21:53:26 2016 -0800 -- sdks/python/apache_beam/io/gcsio.py | 20 ++-- 1 file changed, 18 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4a332d9/sdks/python/apache_beam/io/gcsio.py -- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 1b08994..4f310be 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -47,7 +47,23 @@ except ImportError: 'Google Cloud Storage I/O not supported for this execution environment ' '(could not import storage API client).') -DEFAULT_READ_BUFFER_SIZE = 1024 * 1024 +# This is the size of each partial-file read operation from GCS. This +# parameter was chosen to give good throughput while keeping memory usage at +# a reasonable level; the following table shows throughput reached when +# reading files of a given size with a chosen buffer size and informed the +# choice of the value, as of 11/2016: +# +# +---++-+-+-+ +# | | 50 MB file | 100 MB file | 200 MB file | 400 MB file | +# +---++-+-+-+ +# | 8 MB buffer | 17.12 MB/s | 22.67 MB/s | 23.81 MB/s | 26.05 MB/s | +# | 16 MB buffer | 24.21 MB/s | 42.70 MB/s | 42.89 MB/s | 46.92 MB/s | +# | 32 MB buffer | 28.53 MB/s | 48.08 MB/s | 54.30 MB/s | 54.65 MB/s | +# | 400 MB buffer | 34.72 MB/s | 71.13 MB/s | 79.13 MB/s | 85.39 MB/s | +# +---++-+-+-+ +DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 + +# This is the size of chunks used when writing to GCS. WRITE_CHUNK_SIZE = 8 * 1024 * 1024 @@ -373,7 +389,7 @@ class GcsBufferedReader(object): # Initialize read buffer state. self.download_stream = StringIO.StringIO() self.downloader = transfer.Download( -self.download_stream, auto_transfer=False) +self.download_stream, auto_transfer=False, chunksize=buffer_size) self.client.objects.Get(get_request, download=self.downloader) self.position = 0 self.buffer = ''
[2/2] incubator-beam git commit: Closes #1379
Closes #1379 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1440f7a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1440f7a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1440f7a Branch: refs/heads/python-sdk Commit: c1440f7aa69f0134d52463c4bcdcabce36b481d7 Parents: cce4331 e4a332d Author: Robert Bradshaw Authored: Fri Nov 18 21:53:27 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 21:53:27 2016 -0800 -- sdks/python/apache_beam/io/gcsio.py | 20 ++-- 1 file changed, 18 insertions(+), 2 deletions(-) --
[2/2] incubator-beam git commit: Closes #1380
Closes #1380 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cce4331d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cce4331d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cce4331d Branch: refs/heads/python-sdk Commit: cce4331dc7ed95aa32654e77d2cc170b63437183 Parents: 45b420d 99bcafe Author: Robert Bradshaw Authored: Fri Nov 18 13:37:05 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:37:05 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 5 +++ sdks/python/apache_beam/io/fileio_test.py | 47 ++ 2 files changed, 52 insertions(+) --
[1/2] incubator-beam git commit: Fix issue where batch GCS renames were not issued
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 45b420d82 -> cce4331dc Fix issue where batch GCS renames were not issued Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99bcafe7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99bcafe7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99bcafe7 Branch: refs/heads/python-sdk Commit: 99bcafe7a02bbec5222d77abbad24f5eed8a687f Parents: 45b420d Author: Charles Chen Authored: Thu Nov 17 14:13:56 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:37:04 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 5 +++ sdks/python/apache_beam/io/fileio_test.py | 47 ++ 2 files changed, 52 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99bcafe7/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 3b67c4f..4d0eea6 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -514,6 +514,7 @@ class ChannelFactory(object): gcs_batches = [] gcs_current_batch = [] for src, dest in src_dest_pairs: + gcs_current_batch.append((src, dest)) if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: gcs_batches.append(gcs_current_batch) gcs_current_batch = [] @@ -893,6 +894,8 @@ class FileSink(iobase.Sink): exception_infos = ChannelFactory.rename_batch(batch) for src, dest, exception in exception_infos: if exception: + logging.warning('Rename not successful: %s -> %s, %s', src, dest, + exception) should_report = True if isinstance(exception, IOError): # May have already been copied. @@ -906,6 +909,8 @@ class FileSink(iobase.Sink): logging.warning(('Exception in _rename_batch. src: %s, ' 'dest: %s, err: %s'), src, dest, exception) exceptions.append(exception) +else: + logging.debug('Rename successful: %s -> %s', src, dest) return exceptions # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99bcafe7/sdks/python/apache_beam/io/fileio_test.py -- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 63e71e0..9d1e424 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -28,6 +28,7 @@ import unittest import zlib import hamcrest as hc +import mock import apache_beam as beam from apache_beam import coders @@ -881,6 +882,52 @@ class TestFileSink(unittest.TestCase): with self.assertRaises(Exception): list(sink.finalize_write(init_token, [res1, res2])) + @mock.patch('apache_beam.io.fileio.ChannelFactory.rename') + @mock.patch('apache_beam.io.fileio.gcsio') + def test_rename_batch(self, *unused_args): +# Prepare mocks. +gcsio_mock = mock.MagicMock() +fileio.gcsio.GcsIO = lambda: gcsio_mock +fileio.ChannelFactory.rename = mock.MagicMock() +to_rename = [ +('gs://bucket/from1', 'gs://bucket/to1'), +('gs://bucket/from2', 'gs://bucket/to2'), +('/local/from1', '/local/to1'), +('gs://bucket/from3', 'gs://bucket/to3'), +('/local/from2', '/local/to2'), +] +gcsio_mock.copy_batch.side_effect = [[ +('gs://bucket/from1', 'gs://bucket/to1', None), +('gs://bucket/from2', 'gs://bucket/to2', None), +('gs://bucket/from3', 'gs://bucket/to3', None), +]] +gcsio_mock.delete_batch.side_effect = [[ +('gs://bucket/from1', None), +('gs://bucket/from2', None), +('gs://bucket/from3', None), +]] + +# Issue batch rename. +fileio.ChannelFactory.rename_batch(to_rename) + +# Verify mocks. +expected_local_rename_calls = [ +mock.call('/local/from1', '/local/to1'), +mock.call('/local/from2', '/local/to2'), +] +self.assertEqual(fileio.ChannelFactory.rename.call_args_list, + expected_local_rename_calls) +gcsio_mock.copy_batch.assert_called_once_with([ +('gs://bucket/from1', 'gs://bucket/to1'), +('gs://bucket/from2', 'gs://bucket/to2'), +('gs://bucket/from3', 'gs://bucket/to3'), +]) +gcsio_mock.delete_batch.assert_called_once_with([ +'gs://bucket/from1', +'gs://bucket/from2', +'gs://bucket/from3', +]) + if __name__ == '__main__': logging.getLogger().
[1/2] incubator-beam git commit: Remove redundant REQUIRED_PACKAGES
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 3a0f01c8e -> 45b420d82 Remove redundant REQUIRED_PACKAGES Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/329be6e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/329be6e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/329be6e9 Branch: refs/heads/python-sdk Commit: 329be6e9a16bfe865d61c3d6041ec5fb6707fc6a Parents: 3a0f01c Author: Ahmet Altay Authored: Thu Nov 17 15:20:08 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:35:47 2016 -0800 -- sdks/python/setup.py | 3 --- 1 file changed, 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/329be6e9/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index b8034af..1299bbf 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -96,9 +96,6 @@ REQUIRED_PACKAGES = [ 'python-gflags>=2.0,<4.0.0', 'pyyaml>=3.10,<4.0.0', ] -REQUIRED_TEST_PACKAGES = [ -'pyhamcrest>=1.9,<2.0', -] REQUIRED_TEST_PACKAGES = [
[2/2] incubator-beam git commit: Closes #1383
Closes #1383 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45b420d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45b420d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45b420d8 Branch: refs/heads/python-sdk Commit: 45b420d82aa6f47e3d37f5aa5ba98378cdc01e9c Parents: 3a0f01c 329be6e Author: Robert Bradshaw Authored: Fri Nov 18 13:35:48 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:35:48 2016 -0800 -- sdks/python/setup.py | 3 --- 1 file changed, 3 deletions(-) --
[2/2] incubator-beam git commit: Closes #1385
Closes #1385 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3a0f01c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3a0f01c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3a0f01c8 Branch: refs/heads/python-sdk Commit: 3a0f01c8edd36fe525b8ad155011dfb759dad2b4 Parents: b83f12b 93c5233 Author: Robert Bradshaw Authored: Fri Nov 18 13:33:46 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:33:46 2016 -0800 -- sdks/python/apache_beam/io/filebasedsource.py | 14 ++ .../python/apache_beam/io/filebasedsource_test.py | 18 +- 2 files changed, 27 insertions(+), 5 deletions(-) --
[1/2] incubator-beam git commit: Fixes a couple of issues of FileBasedSource.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk b83f12b9f -> 3a0f01c8e Fixes a couple of issues of FileBasedSource. (1) Updates code so that a user-specified coder properly gets set to sub-sources. (2) Currently each SingleFileSource takes a reference to FileBasedSource while FileBasedSource takes a reference to Concatsource. ConcatSource has a reference to list of SingleFileSources. This results in quadratic space complexity when serializing splits of a FileBasedSource. This CL fixes this issue by making sure that FileBasedSource is cloned before taking a reference to ConcatSource Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/93c5233a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/93c5233a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/93c5233a Branch: refs/heads/python-sdk Commit: 93c5233a1bf28e9b13412b909c2ee877bd6cf635 Parents: b83f12b Author: Chamikara Jayalath Authored: Thu Nov 17 19:18:26 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:33:33 2016 -0800 -- sdks/python/apache_beam/io/filebasedsource.py | 14 ++ .../python/apache_beam/io/filebasedsource_test.py | 18 +- 2 files changed, 27 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93c5233a/sdks/python/apache_beam/io/filebasedsource.py -- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index c7bc27e..7d8f686 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -109,6 +109,12 @@ class FileBasedSource(iobase.BoundedSource): file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] sizes = FileBasedSource._estimate_sizes_in_parallel(file_names) + # We create a reference for FileBasedSource that will be serialized along + # with each _SingleFileSource. To prevent this FileBasedSource from having + # a reference to ConcatSource (resulting in quadratic space complexity) + # we clone it here. + file_based_source_ref = pickler.loads(pickler.dumps(self)) + for index, file_name in enumerate(file_names): if sizes[index] == 0: continue # Ignoring empty file. @@ -123,7 +129,7 @@ class FileBasedSource(iobase.BoundedSource): splittable = False single_file_source = _SingleFileSource( -self, file_name, +file_based_source_ref, file_name, 0, sizes[index], min_bundle_size=self._min_bundle_size, @@ -194,9 +200,6 @@ class FileBasedSource(iobase.BoundedSource): return self._get_concat_source().get_range_tracker(start_position, stop_position) - def default_output_coder(self): -return self._get_concat_source().default_output_coder() - def read_records(self, file_name, offset_range_tracker): """Returns a generator of records created by reading file 'file_name'. @@ -315,3 +318,6 @@ class _SingleFileSource(iobase.BoundedSource): def read(self, range_tracker): return self._file_based_source.read_records(self._file_name, range_tracker) + + def default_output_coder(self): +return self._file_based_source.default_output_coder() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93c5233a/sdks/python/apache_beam/io/filebasedsource_test.py -- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index 7f4d8d3..a455cd3 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -533,6 +533,23 @@ class TestFileBasedSource(unittest.TestCase): assert_that(pcoll, equal_to(lines)) pipeline.run() + def test_splits_get_coder_from_fbs(self): +class DummyCoder(object): + val = 12345 + +class FileBasedSourceWithCoder(LineSource): + + def default_output_coder(self): +return DummyCoder() + +pattern, expected_data = write_pattern([34, 66, 40, 24, 24, 12]) +self.assertEqual(200, len(expected_data)) +fbs = FileBasedSourceWithCoder(pattern) +splits = [split for split in fbs.split(desired_bundle_size=50)] +self.assertTrue(len(splits)) +for split in splits: + self.assertEqual(DummyCoder.val, split.source.default_output_coder().val) + class TestSingleFileSource(unittest.TestCase): @@ -685,7 +702,6 @@ class TestSingleFileSource(unittest.TestCase): read_data.extend(data_from_split) self.ass
[2/2] incubator-beam git commit: Fix shared state across retry decorated functions
Fix shared state across retry decorated functions Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e7f689a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e7f689a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e7f689a7 Branch: refs/heads/python-sdk Commit: e7f689a72143cc5c821449c862c732f877ca4645 Parents: 115cf33 Author: Sourabh Bajaj Authored: Tue Nov 15 15:04:37 2016 -0800 Committer: Robert Bradshaw Committed: Thu Nov 17 12:21:43 2016 -0800 -- sdks/python/apache_beam/utils/retry.py | 8 ++--- sdks/python/apache_beam/utils/retry_test.py | 42 2 files changed, 45 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7f689a7/sdks/python/apache_beam/utils/retry.py -- diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 1f5af88..b3016fd 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -152,12 +152,10 @@ def with_exponential_backoff( def real_decorator(fun): """The real decorator whose purpose is to return the wrapped function.""" - -retry_intervals = iter( -FuzzedExponentialIntervals( -initial_delay_secs, num_retries, fuzz=0.5 if fuzz else 0)) - def wrapper(*args, **kwargs): + retry_intervals = iter( + FuzzedExponentialIntervals( + initial_delay_secs, num_retries, fuzz=0.5 if fuzz else 0)) while True: try: return fun(*args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7f689a7/sdks/python/apache_beam/utils/retry_test.py -- diff --git a/sdks/python/apache_beam/utils/retry_test.py b/sdks/python/apache_beam/utils/retry_test.py index 705c555..7570ca0 100644 --- a/sdks/python/apache_beam/utils/retry_test.py +++ b/sdks/python/apache_beam/utils/retry_test.py @@ -164,5 +164,47 @@ class RetryTest(unittest.TestCase): self.assertEqual(func_name, 'transient_failure') +class DummyClass(object): + def __init__(self, results): +self.index = 0 +self.results = results + + @retry.with_exponential_backoff(num_retries=2, initial_delay_secs=0.1) + def func(self): +self.index += 1 +if self.index > len(self.results) or \ +self.results[self.index - 1] == "Error": + raise ValueError("Error") +return self.results[self.index - 1] + + +class RetryStateTest(unittest.TestCase): + """The test_two_failures and test_single_failure would fail if we have + any shared state for the retry decorator. This test tries to prevent a bug we + found where the state in the decorator was shared across objects and retries + were not available correctly. + + The test_call_two_objects would test this inside the same test. + """ + def test_two_failures(self): +dummy = DummyClass(["Error", "Error", "Success"]) +dummy.func() +self.assertEqual(3, dummy.index) + + def test_single_failure(self): +dummy = DummyClass(["Error", "Success"]) +dummy.func() +self.assertEqual(2, dummy.index) + + def test_call_two_objects(self): +dummy = DummyClass(["Error", "Error", "Success"]) +dummy.func() +self.assertEqual(3, dummy.index) + +dummy2 = DummyClass(["Error", "Success"]) +dummy2.func() +self.assertEqual(2, dummy2.index) + + if __name__ == '__main__': unittest.main()
[1/2] incubator-beam git commit: Closes #1365
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 115cf33ed -> b83f12b9f Closes #1365 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b83f12b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b83f12b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b83f12b9 Branch: refs/heads/python-sdk Commit: b83f12b9fff05755323afd655966bf7c3ee03334 Parents: 115cf33 e7f689a Author: Robert Bradshaw Authored: Thu Nov 17 12:21:43 2016 -0800 Committer: Robert Bradshaw Committed: Thu Nov 17 12:21:43 2016 -0800 -- sdks/python/apache_beam/utils/retry.py | 8 ++--- sdks/python/apache_beam/utils/retry_test.py | 42 2 files changed, 45 insertions(+), 5 deletions(-) --
[1/2] incubator-beam git commit: Upgrade Datastore version
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 21b7844bb -> 115cf33ed Upgrade Datastore version Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/249c9f4c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/249c9f4c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/249c9f4c Branch: refs/heads/python-sdk Commit: 249c9f4cdfae17b1f760cbdeb2a7446439fd Parents: 21b7844 Author: Vikas Kedigehalli Authored: Thu Nov 17 10:43:35 2016 -0800 Committer: Vikas Kedigehalli Committed: Thu Nov 17 10:43:35 2016 -0800 -- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/249c9f4c/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index af59069..b8034af 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -87,7 +87,7 @@ REQUIRED_PACKAGES = [ 'avro>=1.7.7,<2.0.0', 'dill>=0.2.5,<0.3', 'google-apitools>=0.5.2,<1.0.0', -'googledatastore==6.3.0', +'googledatastore==6.4.0', 'httplib2>=0.8,<0.10', 'mock>=1.0.1,<3.0.0', 'oauth2client>=2.0.1,<4.0.0',
[2/2] incubator-beam git commit: Closes #1375
Closes #1375 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/115cf33e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/115cf33e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/115cf33e Branch: refs/heads/python-sdk Commit: 115cf33ede4f6e0267a9ec8757620efb51ceb904 Parents: 21b7844 249c9f4 Author: Robert Bradshaw Authored: Thu Nov 17 12:20:41 2016 -0800 Committer: Robert Bradshaw Committed: Thu Nov 17 12:20:41 2016 -0800 -- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Query Splitter for Datastore v1
Repository: incubator-beam Updated Branches: refs/heads/python-sdk d1fccbf5e -> 21b7844bb Query Splitter for Datastore v1 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1126b70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1126b70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1126b70 Branch: refs/heads/python-sdk Commit: c1126b708469fc63bd8ab8e54026700408ec34da Parents: d1fccbf Author: Vikas Kedigehalli Authored: Mon Oct 24 18:29:29 2016 -0700 Committer: Robert Bradshaw Committed: Tue Nov 15 14:24:02 2016 -0800 -- .../python/apache_beam/io/datastore/__init__.py | 16 ++ .../apache_beam/io/datastore/v1/__init__.py | 16 ++ .../apache_beam/io/datastore/v1/helper.py | 84 ++ .../apache_beam/io/datastore/v1/helper_test.py | 124 + .../io/datastore/v1/query_splitter.py | 270 +++ .../io/datastore/v1/query_splitter_test.py | 257 ++ sdks/python/setup.py| 1 + 7 files changed, 768 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/__init__.py -- diff --git a/sdks/python/apache_beam/io/datastore/__init__.py b/sdks/python/apache_beam/io/datastore/__init__.py new file mode 100644 index 000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/__init__.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/__init__.py b/sdks/python/apache_beam/io/datastore/v1/__init__.py new file mode 100644 index 000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/v1/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/helper.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/helper.py b/sdks/python/apache_beam/io/datastore/v1/helper.py new file mode 100644 index 000..626ab35 --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/v1/helper.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +
[2/2] incubator-beam git commit: Closes #1310
Closes #1310 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/21b7844b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/21b7844b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/21b7844b Branch: refs/heads/python-sdk Commit: 21b7844bb05e9a86531876cffe8ee776bfaaa1cc Parents: d1fccbf c1126b7 Author: Robert Bradshaw Authored: Tue Nov 15 14:24:03 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 14:24:03 2016 -0800 -- .../python/apache_beam/io/datastore/__init__.py | 16 ++ .../apache_beam/io/datastore/v1/__init__.py | 16 ++ .../apache_beam/io/datastore/v1/helper.py | 84 ++ .../apache_beam/io/datastore/v1/helper_test.py | 124 + .../io/datastore/v1/query_splitter.py | 270 +++ .../io/datastore/v1/query_splitter_test.py | 257 ++ sdks/python/setup.py| 1 + 7 files changed, 768 insertions(+) --
[2/3] incubator-beam git commit: Fix merge lint error
Fix merge lint error Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8bf25269 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8bf25269 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8bf25269 Branch: refs/heads/python-sdk Commit: 8bf2526965dd319f654e7f995df940307fb2260f Parents: 9d805ee Author: Robert Bradshaw Authored: Tue Nov 15 11:06:27 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 11:06:27 2016 -0800 -- sdks/python/apache_beam/io/textio.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bf25269/sdks/python/apache_beam/io/textio.py -- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 4e94f87..9c89b68 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -244,7 +244,8 @@ class ReadFromText(PTransform): self._strip_trailing_newlines = strip_trailing_newlines self._coder = coder self._source = _TextSource(file_pattern, min_bundle_size, compression_type, - strip_trailing_newlines, coder, validate=validate) + strip_trailing_newlines, coder, + validate=validate) def apply(self, pvalue): return pvalue.pipeline | Read(self._source)
[1/3] incubator-beam git commit: Display Data for: PipelineOptions, combiners, more sources
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 384fb5dc1 -> d1fccbf5e Display Data for: PipelineOptions, combiners, more sources Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d805eec Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d805eec Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d805eec Branch: refs/heads/python-sdk Commit: 9d805eec6b9cedb43b6e79e255483fc8fa6832d1 Parents: 384fb5d Author: Pablo Authored: Wed Nov 9 14:03:03 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 11:02:28 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 18 +++-- .../apache_beam/internal/json_value_test.py | 8 +- sdks/python/apache_beam/io/avroio.py| 20 - sdks/python/apache_beam/io/avroio_test.py | 78 sdks/python/apache_beam/io/fileio.py| 20 - sdks/python/apache_beam/io/fileio_test.py | 40 -- sdks/python/apache_beam/io/iobase.py| 5 +- sdks/python/apache_beam/io/textio.py| 25 +-- sdks/python/apache_beam/io/textio_test.py | 28 +++ sdks/python/apache_beam/pipeline_test.py| 12 +-- sdks/python/apache_beam/transforms/combiners.py | 14 .../apache_beam/transforms/combiners_test.py| 63 sdks/python/apache_beam/transforms/core.py | 16 +++- .../python/apache_beam/transforms/ptransform.py | 9 +++ sdks/python/apache_beam/utils/options.py| 17 - .../apache_beam/utils/pipeline_options_test.py | 46 ++-- sdks/python/setup.py| 3 + 17 files changed, 375 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 5ac9d6e..8992ec3 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -32,6 +32,7 @@ from apache_beam import utils from apache_beam.internal.auth import get_service_credentials from apache_beam.internal.json_value import to_json_value from apache_beam.transforms import cy_combiners +from apache_beam.transforms.display import DisplayData from apache_beam.utils import dependency from apache_beam.utils import retry from apache_beam.utils.dependency import get_required_container_version @@ -234,11 +235,18 @@ class Environment(object): self.proto.sdkPipelineOptions = ( dataflow.Environment.SdkPipelineOptionsValue()) - for k, v in sdk_pipeline_options.iteritems(): -if v is not None: - self.proto.sdkPipelineOptions.additionalProperties.append( - dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( - key=k, value=to_json_value(v))) + options_dict = {k: v + for k, v in sdk_pipeline_options.iteritems() + if v is not None} + self.proto.sdkPipelineOptions.additionalProperties.append( + dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( + key='options', value=to_json_value(options_dict))) + + dd = DisplayData.create_from(options) + items = [item.get_dict() for item in dd.items] + self.proto.sdkPipelineOptions.additionalProperties.append( + dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( + key='display_data', value=to_json_value(items))) class Job(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/internal/json_value_test.py -- diff --git a/sdks/python/apache_beam/internal/json_value_test.py b/sdks/python/apache_beam/internal/json_value_test.py index cfab293..a4a47b8 100644 --- a/sdks/python/apache_beam/internal/json_value_test.py +++ b/sdks/python/apache_beam/internal/json_value_test.py @@ -76,14 +76,8 @@ class JsonValueTest(unittest.TestCase): self.assertEquals(long(27), from_json_value(to_json_value(long(27 def test_too_long_value(self): -try: +with self.assertRaises(TypeError): to_json_value(long(1 << 64)) -except TypeError as e: - pass -except Exception as e: - self.fail('Unexpected exception raised: {}'.format(e)) -else: - self.fail('TypeError not raised.') if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/avroio.py -- diff --git a/sd
[3/3] incubator-beam git commit: Closes #1264
Closes #1264 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d1fccbf5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d1fccbf5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d1fccbf5 Branch: refs/heads/python-sdk Commit: d1fccbf5eb5064a2a1a6831bb523f8cdf705c8d8 Parents: 384fb5d 8bf2526 Author: Robert Bradshaw Authored: Tue Nov 15 11:06:50 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 11:06:50 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 18 +++-- .../apache_beam/internal/json_value_test.py | 8 +- sdks/python/apache_beam/io/avroio.py| 20 - sdks/python/apache_beam/io/avroio_test.py | 78 sdks/python/apache_beam/io/fileio.py| 20 - sdks/python/apache_beam/io/fileio_test.py | 40 -- sdks/python/apache_beam/io/iobase.py| 5 +- sdks/python/apache_beam/io/textio.py| 26 +-- sdks/python/apache_beam/io/textio_test.py | 28 +++ sdks/python/apache_beam/pipeline_test.py| 12 +-- sdks/python/apache_beam/transforms/combiners.py | 14 .../apache_beam/transforms/combiners_test.py| 63 sdks/python/apache_beam/transforms/core.py | 16 +++- .../python/apache_beam/transforms/ptransform.py | 9 +++ sdks/python/apache_beam/utils/options.py| 17 - .../apache_beam/utils/pipeline_options_test.py | 46 ++-- sdks/python/setup.py| 3 + 17 files changed, 376 insertions(+), 47 deletions(-) --
[2/3] incubator-beam git commit: Add a couple of missing coder tests.
Add a couple of missing coder tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab02a1d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab02a1d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab02a1d6 Branch: refs/heads/python-sdk Commit: ab02a1d60ce70f31c087a78a940bb4833b477ebb Parents: c4208a8 Author: Robert Bradshaw Authored: Wed Nov 9 13:47:53 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 10:48:35 2016 -0800 -- .../apache_beam/coders/coders_test_common.py| 24 1 file changed, 19 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab02a1d6/sdks/python/apache_beam/coders/coders_test_common.py -- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index adeb6a5..2ec8e7f 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -59,13 +59,9 @@ class CodersTest(unittest.TestCase): 'Base' not in c.__name__) standard -= set([coders.Coder, coders.FastCoder, - coders.Base64PickleCoder, - coders.FloatCoder, coders.ProtoCoder, - coders.TimestampCoder, coders.ToStringCoder, - coders.WindowCoder, - coders.WindowedValueCoder]) + coders.WindowCoder]) assert not standard - cls.seen, standard - cls.seen assert not standard - cls.seen_nested, standard - cls.seen_nested @@ -155,6 +151,9 @@ class CodersTest(unittest.TestCase): self.check_coder(coders.FloatCoder(), *[float(2 ** (0.1 * x)) for x in range(-100, 100)]) self.check_coder(coders.FloatCoder(), float('-Inf'), float('Inf')) +self.check_coder( +coders.TupleCoder((coders.FloatCoder(), coders.FloatCoder())), +(0, 1), (-100, 100), (0.5, 0.25)) def test_singleton_coder(self): a = 'anything' @@ -173,6 +172,9 @@ class CodersTest(unittest.TestCase): self.check_coder(coders.TimestampCoder(), timestamp.Timestamp(micros=-1234567890123456789), timestamp.Timestamp(micros=1234567890123456789)) +self.check_coder( +coders.TupleCoder((coders.TimestampCoder(), coders.BytesCoder())), +(timestamp.Timestamp.of(27), 'abc')) def test_tuple_coder(self): self.check_coder( @@ -209,6 +211,18 @@ class CodersTest(unittest.TestCase): coders.IterableCoder(coders.VarIntCoder(, (1, [1, 2, 3])) + def test_windowed_value_coder(self): +self.check_coder( +coders.WindowedValueCoder(coders.VarIntCoder()), +windowed_value.WindowedValue(3, -100, ()), +windowed_value.WindowedValue(-1, 100, (1, 2, 3))) +self.check_coder( +coders.TupleCoder(( +coders.WindowedValueCoder(coders.FloatCoder()), +coders.WindowedValueCoder(coders.StrUtf8Coder(, +(windowed_value.WindowedValue(1.5, 0, ()), + windowed_value.WindowedValue("abc", 10, ('window', + def test_proto_coder(self): # For instructions on how these test proto message were generated, # see coders_test.py
[GitHub] incubator-beam pull request #1326: Additional Coder tests
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/1326 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/3] incubator-beam git commit: Also check coder determinism.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk c4208a899 -> 384fb5dc1 Also check coder determinism. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a078c04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a078c04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a078c04 Branch: refs/heads/python-sdk Commit: 0a078c04b89aee42413d3af9b3fb44b67cc2812b Parents: ab02a1d Author: Robert Bradshaw Authored: Wed Nov 9 13:56:31 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 10:48:35 2016 -0800 -- sdks/python/apache_beam/coders/coders_test_common.py | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a078c04/sdks/python/apache_beam/coders/coders_test_common.py -- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 2ec8e7f..bfd4d77 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -91,6 +91,8 @@ class CodersTest(unittest.TestCase): copy2 = dill.loads(dill.dumps(coder)) for v in values: self.assertEqual(v, copy1.decode(copy2.encode(v))) + if coder.is_deterministic(): +self.assertEqual(copy1.encode(v), copy2.encode(v)) def test_custom_coder(self):
[3/3] incubator-beam git commit: Closes #1326
Closes #1326 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/384fb5dc Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/384fb5dc Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/384fb5dc Branch: refs/heads/python-sdk Commit: 384fb5dc15dd96cf639811dd4111cee32fc74085 Parents: c4208a8 0a078c0 Author: Robert Bradshaw Authored: Tue Nov 15 10:48:36 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 10:48:36 2016 -0800 -- .../apache_beam/coders/coders_test_common.py| 26 1 file changed, 21 insertions(+), 5 deletions(-) --
[2/2] incubator-beam git commit: Closes #1304
Closes #1304 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c4208a89 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c4208a89 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c4208a89 Branch: refs/heads/python-sdk Commit: c4208a899c38a92acdd95ccf37cb53237a593535 Parents: 6ac6e42 d0e3121 Author: Robert Bradshaw Authored: Tue Nov 15 08:53:07 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:53:07 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 1 + sdks/python/apache_beam/utils/names.py | 1 + 2 files changed, 2 insertions(+) --
[1/2] incubator-beam git commit: Allow for passing format so that we can migrate to BQ Avro export later
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 6ac6e420f -> c4208a899 Allow for passing format so that we can migrate to BQ Avro export later Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d0e31218 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d0e31218 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d0e31218 Branch: refs/heads/python-sdk Commit: d0e312184e319050baa02abff2c08348b6cfb651 Parents: 6ac6e42 Author: Sourabh Bajaj Authored: Mon Nov 7 18:15:17 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:53:06 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 1 + sdks/python/apache_beam/utils/names.py | 1 + 2 files changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0e31218/sdks/python/apache_beam/runners/dataflow_runner.py -- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 57867fa..00b466b 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -515,6 +515,7 @@ class DataflowPipelineRunner(PipelineRunner): elif transform.source.format == 'text': step.add_property(PropertyNames.FILE_PATTERN, transform.source.path) elif transform.source.format == 'bigquery': + step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_JSON') # TODO(silviuc): Add table validation if transform.source.validate. if transform.source.table_reference is not None: step.add_property(PropertyNames.BIGQUERY_DATASET, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0e31218/sdks/python/apache_beam/utils/names.py -- diff --git a/sdks/python/apache_beam/utils/names.py b/sdks/python/apache_beam/utils/names.py index be8c92a..3edde3c 100644 --- a/sdks/python/apache_beam/utils/names.py +++ b/sdks/python/apache_beam/utils/names.py @@ -46,6 +46,7 @@ class PropertyNames(object): BIGQUERY_DATASET = 'dataset' BIGQUERY_QUERY = 'bigquery_query' BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql' + BIGQUERY_EXPORT_FORMAT = 'bigquery_export_format' BIGQUERY_TABLE = 'table' BIGQUERY_PROJECT = 'project' BIGQUERY_SCHEMA = 'schema'
[1/2] incubator-beam git commit: Add IP configuration to Python SDK
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 66f324b35 -> 6ac6e420f Add IP configuration to Python SDK Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af6f4e90 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af6f4e90 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af6f4e90 Branch: refs/heads/python-sdk Commit: af6f4e90a239dbf1a2ad6f0cd8974602dfa9e9b4 Parents: 66f324b Author: Sam McVeety Authored: Fri Nov 11 19:56:34 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:50:31 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 9 + sdks/python/apache_beam/utils/options.py | 4 2 files changed, 13 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af6f4e90/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 8c7cc29..5ac9d6e 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -210,6 +210,15 @@ class Environment(object): pool.teardownPolicy = ( dataflow.WorkerPool .TeardownPolicyValueValuesEnum.TEARDOWN_ON_SUCCESS) +if self.worker_options.use_public_ips is not None: + if self.worker_options.use_public_ips: +pool.ipConfiguration = ( +dataflow.WorkerPool +.IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC) + else: +pool.ipConfiguration = ( +dataflow.WorkerPool +.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE) if self.standard_options.streaming: # Use separate data disk for streaming. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af6f4e90/sdks/python/apache_beam/utils/options.py -- diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py index ecc85ba..f68335b 100644 --- a/sdks/python/apache_beam/utils/options.py +++ b/sdks/python/apache_beam/utils/options.py @@ -348,6 +348,10 @@ class WorkerOptions(PipelineOptions): help= ('The teardown policy for the VMs. By default this is left unset and ' 'the service sets the default policy.')) +parser.add_argument( +'--use_public_ips', +default=None, +help='Whether to assign public IP addresses to the worker machines.') def validate(self, validator): errors = []
[2/2] incubator-beam git commit: Closes #1354
Closes #1354 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ac6e420 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ac6e420 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ac6e420 Branch: refs/heads/python-sdk Commit: 6ac6e420f4f91e3326b07753004d3a56f34b4226 Parents: 66f324b af6f4e9 Author: Robert Bradshaw Authored: Tue Nov 15 08:50:32 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:50:32 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 9 + sdks/python/apache_beam/utils/options.py | 4 2 files changed, 13 insertions(+) --
[1/2] incubator-beam git commit: Use batch GCS operations during FileSink write finalization
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 560fe79f8 -> 66f324b35 Use batch GCS operations during FileSink write finalization Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/313191e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/313191e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/313191e1 Branch: refs/heads/python-sdk Commit: 313191e129b884e4e14e9f503a757147d368217c Parents: 560fe79 Author: Charles Chen Authored: Thu Nov 10 11:54:08 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:48:47 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 177 - sdks/python/apache_beam/io/fileio_test.py | 2 +- sdks/python/apache_beam/io/gcsio.py | 78 +++ sdks/python/apache_beam/io/gcsio_test.py | 103 +- sdks/python/apache_beam/utils/retry.py| 3 + 5 files changed, 298 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 669bfc9..ef20a7c 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -31,6 +31,7 @@ import zlib import weakref from apache_beam import coders +from apache_beam.io import gcsio from apache_beam.io import iobase from apache_beam.io import range_trackers from apache_beam.runners.dataflow.native_io import iobase as dataflow_io @@ -451,8 +452,6 @@ class ChannelFactory(object): 'was %s' % type(compression_type)) if path.startswith('gs://'): - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio raw_file = gcsio.GcsIO().open( path, mode, @@ -470,40 +469,92 @@ class ChannelFactory(object): return isinstance(fileobj, _CompressedFile) @staticmethod - def rename(src, dst): + def rename(src, dest): if src.startswith('gs://'): - assert dst.startswith('gs://'), dst - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio - gcsio.GcsIO().rename(src, dst) + if not dest.startswith('gs://'): +raise ValueError('Destination %r must be GCS path.', dest) + gcsio.GcsIO().rename(src, dest) else: try: -os.rename(src, dst) +os.rename(src, dest) except OSError as err: raise IOError(err) @staticmethod - def copytree(src, dst): + def rename_batch(src_dest_pairs): +# Filter out local and GCS operations. +local_src_dest_pairs = [] +gcs_src_dest_pairs = [] +for src, dest in src_dest_pairs: + if src.startswith('gs://'): +if not dest.startswith('gs://'): + raise ValueError('Destination %r must be GCS path.', dest) +gcs_src_dest_pairs.append((src, dest)) + else: +local_src_dest_pairs.append((src, dest)) + +# Execute local operations. +exceptions = [] +for src, dest in local_src_dest_pairs: + try: +ChannelFactory.rename(src, dest) + except Exception as e: # pylint: disable=broad-except +exceptions.append((src, dest, e)) + +# Execute GCS operations. +exceptions += ChannelFactory._rename_gcs_batch(gcs_src_dest_pairs) + +return exceptions + + @staticmethod + def _rename_gcs_batch(src_dest_pairs): +# Prepare batches. +gcs_batches = [] +gcs_current_batch = [] +for src, dest in src_dest_pairs: + if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: +gcs_batches.append(gcs_current_batch) +gcs_current_batch = [] +if gcs_current_batch: + gcs_batches.append(gcs_current_batch) + +# Execute GCS renames if any and return exceptions. +exceptions = [] +for batch in gcs_batches: + copy_statuses = gcsio.GcsIO().copy_batch(batch) + copy_succeeded = [] + for src, dest, exception in copy_statuses: +if exception: + exceptions.append((src, dest, exception)) +else: + copy_succeeded.append((src, dest)) + delete_batch = [src for src, dest in copy_succeeded] + delete_statuses = gcsio.GcsIO().delete_batch(delete_batch) + for i, (src, exception) in enumerate(delete_statuses): +dest = copy_succeeded[i] +if exception: + exceptions.append((src, dest, exception)) +return exceptions + + @staticmethod + def copytree(src, dest): if src.startswith('gs://'): - assert dst.startswith('gs://'), dst + if not dest.startswith('gs://'): +raise ValueError('Destination %
[2/2] incubator-beam git commit: Closes #1337
Closes #1337 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66f324b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66f324b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66f324b3 Branch: refs/heads/python-sdk Commit: 66f324b350c50720cc88357bc2d56e2ecd99adc8 Parents: 560fe79 313191e Author: Robert Bradshaw Authored: Tue Nov 15 08:48:48 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:48:48 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 177 - sdks/python/apache_beam/io/fileio_test.py | 2 +- sdks/python/apache_beam/io/gcsio.py | 78 +++ sdks/python/apache_beam/io/gcsio_test.py | 103 +- sdks/python/apache_beam/utils/retry.py| 3 + 5 files changed, 298 insertions(+), 65 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-852] Add validation to file based sources during create time
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 15e78b28a -> 560fe79f8 [BEAM-852] Add validation to file based sources during create time Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76ad2929 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76ad2929 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76ad2929 Branch: refs/heads/python-sdk Commit: 76ad29296fd57e1eec97bf40d9cf3a1d54a63a3f Parents: 15e78b2 Author: Sourabh Bajaj Authored: Mon Nov 14 15:40:10 2016 -0800 Committer: Robert Bradshaw Committed: Mon Nov 14 15:40:10 2016 -0800 -- sdks/python/apache_beam/io/avroio.py| 8 +++- sdks/python/apache_beam/io/bigquery.py | 2 +- sdks/python/apache_beam/io/filebasedsource.py | 16 +++- .../apache_beam/io/filebasedsource_test.py | 41 ++-- sdks/python/apache_beam/io/textio.py| 13 +-- 5 files changed, 60 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/avroio.py -- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 53ed95a..e7e73dd 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -37,7 +37,7 @@ __all__ = ['ReadFromAvro', 'WriteToAvro'] class ReadFromAvro(PTransform): """A ``PTransform`` for reading avro files.""" - def __init__(self, file_pattern=None, min_bundle_size=0): + def __init__(self, file_pattern=None, min_bundle_size=0, validate=True): """Initializes ``ReadFromAvro``. Uses source '_AvroSource' to read a set of Avro files defined by a given @@ -70,13 +70,17 @@ class ReadFromAvro(PTransform): file_pattern: the set of files to be read. min_bundle_size: the minimum size in bytes, to be considered when splitting the input into bundles. + validate: flag to verify that the files exist during the pipeline +creation time. **kwargs: Additional keyword arguments to be passed to the base class. """ super(ReadFromAvro, self).__init__() self._args = (file_pattern, min_bundle_size) +self._validate = validate def apply(self, pvalue): -return pvalue.pipeline | Read(_AvroSource(*self._args)) +return pvalue.pipeline | Read(_AvroSource(*self._args, + validate=self._validate)) class _AvroUtils(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/bigquery.py -- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index f0e88a6..8d7892a 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -65,7 +65,7 @@ input entails querying the table for all its rows. The coder argument on BigQuerySource controls the reading of the lines in the export files (i.e., transform a JSON object into a PCollection element). The coder is not involved when the same table is read as a side input since there is no intermediate -format involved. We get the table rows directly from the BigQuery service with +format involved. We get the table rows directly from the BigQuery service with a query. Users may provide a query to read from rather than reading all of a BigQuery http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/filebasedsource.py -- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 58ad118..c7bc27e 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -50,7 +50,8 @@ class FileBasedSource(iobase.BoundedSource): file_pattern, min_bundle_size=0, compression_type=fileio.CompressionTypes.AUTO, - splittable=True): + splittable=True, + validate=True): """Initializes ``FileBasedSource``. Args: @@ -68,10 +69,13 @@ class FileBasedSource(iobase.BoundedSource): the file, for example, for compressed files where currently it is not possible to efficiently read a data range without decompressing the whole file. + validate: Boolean flag to verify that the files exist during the pipeline +creation time. Raises: TypeError: when compression_type is not valid or if file_pattern is
[2/2] incubator-beam git commit: Closes #1220
Closes #1220 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/560fe79f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/560fe79f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/560fe79f Branch: refs/heads/python-sdk Commit: 560fe79f875b9d08b7256e6bf653a48b19f0ccb5 Parents: 15e78b2 76ad292 Author: Robert Bradshaw Authored: Mon Nov 14 15:41:00 2016 -0800 Committer: Robert Bradshaw Committed: Mon Nov 14 15:41:00 2016 -0800 -- sdks/python/apache_beam/io/avroio.py| 8 +++- sdks/python/apache_beam/io/bigquery.py | 2 +- sdks/python/apache_beam/io/filebasedsource.py | 16 +++- .../apache_beam/io/filebasedsource_test.py | 41 ++-- sdks/python/apache_beam/io/textio.py| 13 +-- 5 files changed, 60 insertions(+), 20 deletions(-) --
[2/2] incubator-beam git commit: Closes #1349
Closes #1349 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15e78b28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15e78b28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15e78b28 Branch: refs/heads/python-sdk Commit: 15e78b28a63f0987d7e361f5f5b4c9b6be532316 Parents: 6fb4169 1fc9f70 Author: Robert Bradshaw Authored: Fri Nov 11 11:51:21 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 11 11:51:21 2016 -0800 -- sdks/python/apache_beam/utils/windowed_value.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Remove the inline from WindowedValue.create()
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 6fb416989 -> 15e78b28a Remove the inline from WindowedValue.create() Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1fc9f70b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1fc9f70b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1fc9f70b Branch: refs/heads/python-sdk Commit: 1fc9f70bfeec18d62a4141a44f3bbd40151efd85 Parents: 6fb4169 Author: Ahmet Altay Authored: Fri Nov 11 11:43:18 2016 -0800 Committer: Ahmet Altay Committed: Fri Nov 11 11:43:18 2016 -0800 -- sdks/python/apache_beam/utils/windowed_value.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fc9f70b/sdks/python/apache_beam/utils/windowed_value.pxd -- diff --git a/sdks/python/apache_beam/utils/windowed_value.pxd b/sdks/python/apache_beam/utils/windowed_value.pxd index 8799914..41c2986 100644 --- a/sdks/python/apache_beam/utils/windowed_value.pxd +++ b/sdks/python/apache_beam/utils/windowed_value.pxd @@ -34,5 +34,5 @@ cdef class WindowedValue(object): cdef inline bint _typed_eq(WindowedValue left, WindowedValue right) except? -2 @cython.locals(wv=WindowedValue) -cdef inline WindowedValue create( +cdef WindowedValue create( object value, int64_t timestamp_micros, object windows)
[1/2] incubator-beam git commit: Closes #1346
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 778194f22 -> 6fb416989 Closes #1346 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6fb41698 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6fb41698 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6fb41698 Branch: refs/heads/python-sdk Commit: 6fb416989128519cb541bced107facae9012f51c Parents: 778194f 6f93cd5 Author: Robert Bradshaw Authored: Fri Nov 11 00:23:45 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 11 00:23:45 2016 -0800 -- .../examples/snippets/snippets_test.py | 14 ++--- sdks/python/apache_beam/pipeline_test.py| 5 +++ .../apache_beam/runners/direct/direct_runner.py | 2 +- .../apache_beam/runners/direct/executor.py | 33 .../runners/direct/transform_evaluator.py | 8 +++-- 5 files changed, 41 insertions(+), 21 deletions(-) --
[2/2] incubator-beam git commit: DirectPipelineRunner bug fixes.
DirectPipelineRunner bug fixes. - Execute empty [] | pipelines to the end. - use pickler to serialize/deserialize DoFns instead of deepcopy similar to the othe execution environments. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f93cd58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f93cd58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f93cd58 Branch: refs/heads/python-sdk Commit: 6f93cd5884797c0880766c7737e106765becf96d Parents: 778194f Author: Ahmet Altay Authored: Thu Nov 10 17:37:45 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 11 00:23:45 2016 -0800 -- .../examples/snippets/snippets_test.py | 14 ++--- sdks/python/apache_beam/pipeline_test.py| 5 +++ .../apache_beam/runners/direct/direct_runner.py | 2 +- .../apache_beam/runners/direct/executor.py | 33 .../runners/direct/transform_evaluator.py | 8 +++-- 5 files changed, 41 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/examples/snippets/snippets_test.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index edc0a17..72fccb2 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -29,6 +29,8 @@ from apache_beam import io from apache_beam import pvalue from apache_beam import typehints from apache_beam.io import fileio +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import equal_to from apache_beam.utils.options import TypeOptions from apache_beam.examples.snippets import snippets @@ -307,7 +309,9 @@ class TypeHintsTest(unittest.TestCase): # [END type_hints_runtime_on] def test_deterministic_key(self): -lines = ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'] +p = beam.Pipeline('DirectPipelineRunner') +lines = (p | beam.Create( +['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'])) # [START type_hints_deterministic_key] class Player(object): @@ -338,9 +342,11 @@ class TypeHintsTest(unittest.TestCase): beam.typehints.Tuple[Player, int])) # [END type_hints_deterministic_key] -self.assertEquals( -{('banana', 3), ('kiwi', 4), ('zucchini', 3)}, -set(totals | beam.Map(lambda (k, v): (k.name, v +assert_that( +totals | beam.Map(lambda (k, v): (k.name, v)), +equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)])) + +p.run() class SnippetsTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/pipeline_test.py -- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index a4c983f..013796c 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -24,6 +24,7 @@ from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions from apache_beam.pipeline import PipelineVisitor from apache_beam.runners.dataflow.native_io.iobase import NativeSource +from apache_beam.transforms import CombineGlobally from apache_beam.transforms import Create from apache_beam.transforms import FlatMap from apache_beam.transforms import Map @@ -217,6 +218,10 @@ class PipelineTest(unittest.TestCase): pipeline.run() + def test_aggregator_empty_input(self): +actual = [] | CombineGlobally(max).without_defaults() +self.assertEqual(actual, []) + def test_pipeline_as_context(self): def raise_exception(exn): raise exn http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/direct_runner.py -- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 2e5fe74..1afd486 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -125,7 +125,7 @@ class BufferingInMemoryCache(object): for key, value in self._cache.iteritems(): applied_ptransform, tag = key self._pvalue_cache.cache_output(applied_ptransform, tag, value) - self._cache = None +self._cache = None class DirectPipelineResult(PipelineResult): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdk
[2/2] incubator-beam git commit: Closes #1330
Closes #1330 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/778194f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/778194f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/778194f2 Branch: refs/heads/python-sdk Commit: 778194f22ae850195c270991499f988f8fe50972 Parents: ec00c53 4827ae8 Author: Robert Bradshaw Authored: Thu Nov 10 14:05:02 2016 -0800 Committer: Robert Bradshaw Committed: Thu Nov 10 14:05:02 2016 -0800 -- sdks/python/run_postcommit.sh | 3 +++ 1 file changed, 3 insertions(+) --
[1/2] incubator-beam git commit: Remove tox cache from previous workspace
Repository: incubator-beam Updated Branches: refs/heads/python-sdk ec00c530c -> 778194f22 Remove tox cache from previous workspace Jenkins doesn't cleanup the previous workspace, and since .tox is in the gitignore, we must explicitly delete it. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4827ae84 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4827ae84 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4827ae84 Branch: refs/heads/python-sdk Commit: 4827ae840797eb43aa7e4265dad7112804b5fb85 Parents: ec00c53 Author: Vikas Kedigehalli Authored: Wed Nov 9 17:25:15 2016 -0800 Committer: Robert Bradshaw Committed: Thu Nov 10 14:05:01 2016 -0800 -- sdks/python/run_postcommit.sh | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4827ae84/sdks/python/run_postcommit.sh -- diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh index 23dd516..2cd40da 100755 --- a/sdks/python/run_postcommit.sh +++ b/sdks/python/run_postcommit.sh @@ -31,6 +31,9 @@ set -v # pip install --user installation location. LOCAL_PATH=$HOME/.local/bin/ +# Remove any tox cache from previous workspace +rm -rf sdks/python/.tox + # INFRA does not install virtualenv pip install virtualenv --user
[GitHub] incubator-beam pull request #1326: Additional Coder tests
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/1326 Additional Coder tests I noticed these were missing while merging another PR. R: @mariapython You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam coder-tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1326 commit 4f8c3060266fc857908187e365f8a44b49fdee00 Author: Robert Bradshaw Date: 2016-11-09T21:47:53Z Add a couple of missing coder tests. commit 3f8b2c7e52448798c166751cd74be54518650ee9 Author: Robert Bradshaw Date: 2016-11-09T21:56:31Z Also check coder determinism. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1253: Optimize WindowedValueCoder
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/1253 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #1253
Closes #1253 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec00c530 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec00c530 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec00c530 Branch: refs/heads/python-sdk Commit: ec00c530c9a54ce61095214fcce7b69a7c653d95 Parents: ea64242 0c90fb8 Author: Robert Bradshaw Authored: Wed Nov 9 13:26:45 2016 -0800 Committer: Robert Bradshaw Committed: Wed Nov 9 13:26:45 2016 -0800 -- sdks/python/apache_beam/coders/coder_impl.pxd | 4 sdks/python/apache_beam/coders/coder_impl.py| 21 ++-- .../apache_beam/coders/coders_test_common.py| 8 ++-- 3 files changed, 21 insertions(+), 12 deletions(-) --
[1/2] incubator-beam git commit: Optimize WindowedValueCoder
Repository: incubator-beam Updated Branches: refs/heads/python-sdk ea642428f -> ec00c530c Optimize WindowedValueCoder Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0c90fb80 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0c90fb80 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0c90fb80 Branch: refs/heads/python-sdk Commit: 0c90fb80f3961848aa82667e8891cfebf4dbc351 Parents: ea64242 Author: Robert Bradshaw Authored: Tue Nov 1 16:12:19 2016 -0700 Committer: Robert Bradshaw Committed: Wed Nov 9 13:26:44 2016 -0800 -- sdks/python/apache_beam/coders/coder_impl.pxd | 4 sdks/python/apache_beam/coders/coder_impl.py| 21 ++-- .../apache_beam/coders/coders_test_common.py| 8 ++-- 3 files changed, 21 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.pxd -- diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index e021c2e..7ff 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -26,6 +26,7 @@ cimport libc.stdlib cimport libc.string from .stream cimport InputStream, OutputStream +from apache_beam.utils cimport windowed_value cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size @@ -137,3 +138,6 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl): @cython.locals(c=CoderImpl) cpdef get_estimated_size_and_observables(self, value, bint nested=?) + + @cython.locals(wv=windowed_value.WindowedValue) + cpdef encode_to_stream(self, value, OutputStream stream, bint nested) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.py -- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index d075814..47a837f 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -29,7 +29,7 @@ from types import NoneType from apache_beam.coders import observable from apache_beam.utils.timestamp import Timestamp -from apache_beam.utils.windowed_value import WindowedValue +from apache_beam.utils import windowed_value # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: @@ -535,19 +535,28 @@ class WindowedValueCoderImpl(StreamCoderImpl): """A coder for windowed values.""" def __init__(self, value_coder, timestamp_coder, window_coder): +# TODO(robertwb): Do we need the ability to customize timestamp_coder? self._value_coder = value_coder self._timestamp_coder = timestamp_coder self._windows_coder = TupleSequenceCoderImpl(window_coder) def encode_to_stream(self, value, out, nested): -self._value_coder.encode_to_stream(value.value, out, True) -self._timestamp_coder.encode_to_stream(value.timestamp, out, True) -self._windows_coder.encode_to_stream(value.windows, out, True) +wv = value # type cast +self._value_coder.encode_to_stream(wv.value, out, True) +if isinstance(self._timestamp_coder, TimestampCoderImpl): + # Avoid creation of Timestamp object. + out.write_bigendian_int64(wv.timestamp_micros) +else: + self._timestamp_coder.encode_to_stream(wv.timestamp, out, True) +self._windows_coder.encode_to_stream(wv.windows, out, True) def decode_from_stream(self, in_stream, nested): -return WindowedValue( +return windowed_value.create( self._value_coder.decode_from_stream(in_stream, True), -self._timestamp_coder.decode_from_stream(in_stream, True), +# Avoid creation of Timestamp object. +in_stream.read_bigendian_int64() +if isinstance(self._timestamp_coder, TimestampCoderImpl) +else self._timestamp_coder.decode_from_stream(in_stream, True).micros, self._windows_coder.decode_from_stream(in_stream, True)) def get_estimated_size_and_observables(self, value, nested=False): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coders_test_common.py -- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 1af8347..adeb6a5 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -26,6 +26,7 @@ i
[GitHub] incubator-beam pull request #1323: Testing pr 1212
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/1323 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---