[2/2] beam git commit: Closes #2586
Closes #2586 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4121ec49 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4121ec49 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4121ec49 Branch: refs/heads/master Commit: 4121ec490f2b3714cb668cc5811ba0bf88d610e5 Parents: e26bfbe c25380b Author: Robert BradshawAuthored: Thu Apr 20 08:59:52 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:59:52 2017 -0700 -- .../examples/cookbook/bigquery_side_input.py | 2 +- .../apache_beam/examples/cookbook/filters.py | 2 +- sdks/python/apache_beam/io/concat_source_test.py | 2 +- .../python/apache_beam/io/filebasedsource_test.py | 18 +- sdks/python/apache_beam/io/sources_test.py| 2 +- sdks/python/apache_beam/io/tfrecordio_test.py | 8 sdks/python/apache_beam/pipeline_test.py | 2 +- sdks/python/apache_beam/transforms/core.py| 13 ++--- 8 files changed, 20 insertions(+), 29 deletions(-) --
[2/2] beam git commit: Closes #2604
Closes #2604 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e26bfbe0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e26bfbe0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e26bfbe0 Branch: refs/heads/master Commit: e26bfbe0ae867494cd63dc4fb04473f67ce102fa Parents: 7019aa7 815bbdc Author: Robert BradshawAuthored: Thu Apr 20 08:57:07 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:57:07 2017 -0700 -- .../apache_beam/examples/cookbook/bigshuffle.py | 94 .../examples/cookbook/bigshuffle_test.py| 63 - 2 files changed, 157 deletions(-) --
[1/2] beam git commit: Remove bigshuffle from python examples
Repository: beam Updated Branches: refs/heads/master 7019aa70d -> e26bfbe0a Remove bigshuffle from python examples Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/815bbdcc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/815bbdcc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/815bbdcc Branch: refs/heads/master Commit: 815bbdccc6fd9f0cfcd3b9fb86476b61eb08b293 Parents: 7019aa7 Author: Vikas KedigehalliAuthored: Wed Apr 19 17:12:52 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:57:06 2017 -0700 -- .../apache_beam/examples/cookbook/bigshuffle.py | 94 .../examples/cookbook/bigshuffle_test.py| 63 - 2 files changed, 157 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/815bbdcc/sdks/python/apache_beam/examples/cookbook/bigshuffle.py -- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py deleted file mode 100644 index 79cc85c..000 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""A BigShuffle workflow.""" - -from __future__ import absolute_import - -import argparse -import binascii -import logging - - -import apache_beam as beam -from apache_beam.io import ReadFromText -from apache_beam.io import WriteToText -from apache_beam.utils.pipeline_options import PipelineOptions -from apache_beam.utils.pipeline_options import SetupOptions - - -def crc32line(line): - return binascii.crc32(line) & 0x - - -def run(argv=None): - # pylint: disable=expression-not-assigned - - parser = argparse.ArgumentParser() - parser.add_argument('--input', - required=True, - help='Input file pattern to process.') - parser.add_argument('--output', - required=True, - help='Output file pattern to write results to.') - parser.add_argument('--checksum_output', - help='Checksum output file pattern.') - known_args, pipeline_args = parser.parse_known_args(argv) - # We use the save_main_session option because one or more DoFn's in this - # workflow rely on global context (e.g., a module imported at module level). - pipeline_options = PipelineOptions(pipeline_args) - pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - # Read the text file[pattern] into a PCollection. - lines = p | ReadFromText(known_args.input, coder=beam.coders.BytesCoder()) - - # Count the occurrences of each word. - output = (lines -| 'split' >> beam.Map( -lambda x: (x[:10], x[10:99])) -.with_output_types(beam.typehints.KV[str, str]) -| 'group' >> beam.GroupByKey() -| 'format' >> beam.FlatMap( -lambda (key, vals): ['%s%s' % (key, val) for val in vals])) - - # Write the output using a "Write" transform that has side effects. - output | WriteToText(known_args.output) - - # Optionally write the input and output checksums. - if known_args.checksum_output: -input_csum = (lines - | 'input-csum' >> beam.Map(crc32line) - | 'combine-input-csum' >> beam.CombineGlobally(sum) - | 'hex-format' >> beam.Map(lambda x: '%x' % x)) -input_csum | 'write-input-csum' >> WriteToText( -known_args.checksum_output + '-input') - -output_csum = (output - | 'output-csum' >> beam.Map(crc32line) - | 'combine-output-csum' >> beam.CombineGlobally(sum) - | 'hex-format-output' >> beam.Map(lambda x: '%x' % x)) -output_csum | 'write-output-csum' >> WriteToText( -known_args.checksum_output + '-output') - - # Actually run the pipeline (all operations above are deferred). - return p.run() - - -if
[1/6] beam git commit: Make stage names consistent.
Repository: beam Updated Branches: refs/heads/master 4e0d5f596 -> 7019aa70d Make stage names consistent. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/72f50209 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/72f50209 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/72f50209 Branch: refs/heads/master Commit: 72f502091c2c3e534f41b3fbd211c2a701e89eba Parents: 4e0d5f5 Author: Robert BradshawAuthored: Tue Apr 11 10:29:15 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:55:02 2017 -0700 -- sdks/python/apache_beam/io/gcp/bigquery.py | 6 +++--- .../io/gcp/datastore/v1/datastoreio.py | 6 +++--- sdks/python/apache_beam/io/iobase.py | 2 +- sdks/python/apache_beam/pipeline.py | 4 ++-- sdks/python/apache_beam/transforms/core.py | 19 ++- sdks/python/apache_beam/transforms/ptransform.py | 2 +- .../apache_beam/transforms/ptransform_test.py| 4 ++-- 7 files changed, 22 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/bigquery.py -- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 4686518..891f62a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -45,11 +45,11 @@ call *one* row of the main table and *all* rows of the side table. The runner may use some caching techniques to share the side inputs between calls in order to avoid excessive reading::: - main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource() - side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource() + main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource() + side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource() results = ( main_table - | 'process data' >> beam.Map( + | 'ProcessData' >> beam.Map( lambda element, side_input: ..., AsList(side_table))) There is no difference in how main and side inputs are read. What makes the http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py -- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py index d9b3598..a0ccbbb 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py @@ -137,15 +137,15 @@ class ReadFromDatastore(PTransform): # outputs a ``PCollection[Entity]``. queries = (pcoll.pipeline - | 'User Query' >> Create([self._query]) - | 'Split Query' >> ParDo(ReadFromDatastore.SplitQueryFn( + | 'UserQuery' >> Create([self._query]) + | 'SplitQuery' >> ParDo(ReadFromDatastore.SplitQueryFn( self._project, self._query, self._datastore_namespace, self._num_splits))) sharded_queries = (queries | GroupByKey() | Values() - | 'flatten' >> FlatMap(lambda x: x)) + | 'Flatten' >> FlatMap(lambda x: x)) entities = sharded_queries | 'Read' >> ParDo( ReadFromDatastore.ReadFn(self._project, self._datastore_namespace)) http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/iobase.py -- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index d9df5c4..2cac67f 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -903,7 +903,7 @@ class WriteImpl(ptransform.PTransform): | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() | 'Extract' >> core.FlatMap(lambda x: x[1])) -return do_once | 'finalize_write' >> core.FlatMap( +return do_once | 'FinalizeWrite' >> core.FlatMap( _finalize_write, self.sink, AsSingleton(init_result_coll), http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/pipeline.py -- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 2ff9eb3..8e811bc 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -32,11 +32,11
[6/6] beam git commit: Closes #2552
Closes #2552 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7019aa70 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7019aa70 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7019aa70 Branch: refs/heads/master Commit: 7019aa70db62575ae275ae93155bb0903a1ff26e Parents: 4e0d5f5 2c34719 Author: Robert BradshawAuthored: Thu Apr 20 08:55:06 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:55:06 2017 -0700 -- sdks/python/apache_beam/coders/coders.py| 16 - .../apache_beam/coders/coders_test_common.py| 7 ++-- sdks/python/apache_beam/io/gcp/bigquery.py | 6 ++-- .../io/gcp/datastore/v1/datastoreio.py | 6 ++-- sdks/python/apache_beam/io/iobase.py| 2 +- sdks/python/apache_beam/pipeline.py | 4 +-- sdks/python/apache_beam/runners/runner.py | 34 sdks/python/apache_beam/transforms/core.py | 23 +++-- .../python/apache_beam/transforms/ptransform.py | 2 +- .../apache_beam/transforms/ptransform_test.py | 4 +-- sdks/python/apache_beam/transforms/trigger.py | 10 +++--- .../apache_beam/transforms/trigger_test.py | 10 +++--- sdks/python/apache_beam/transforms/window.py| 18 ++- 13 files changed, 58 insertions(+), 84 deletions(-) --
[2/6] beam git commit: Require deterministic window coders.
Require deterministic window coders. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d98294c2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d98294c2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d98294c2 Branch: refs/heads/master Commit: d98294c2bd13b45522ea584485bd62e900144c88 Parents: 72f5020 Author: Robert BradshawAuthored: Tue Apr 11 10:49:13 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:55:03 2017 -0700 -- sdks/python/apache_beam/coders/coders.py | 16 .../apache_beam/coders/coders_test_common.py | 1 - sdks/python/apache_beam/transforms/core.py| 4 sdks/python/apache_beam/transforms/window.py | 18 +- 4 files changed, 21 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders.py -- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 8ef0a46..4f75182 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -688,22 +688,6 @@ class IterableCoder(FastCoder): return hash((type(self), self._elem_coder)) -class WindowCoder(PickleCoder): - """Coder for windows in windowed values.""" - - def _create_impl(self): -return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads) - - def is_deterministic(self): -# Note that WindowCoder as implemented is not deterministic because the -# implementation simply pickles windows. See the corresponding comments -# on PickleCoder for more details. -return False - - def as_cloud_object(self): -return super(WindowCoder, self).as_cloud_object(is_pair_like=False) - - class GlobalWindowCoder(SingletonCoder): """Coder for global windows.""" http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders_test_common.py -- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 6491ea8..da0bde3 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -62,7 +62,6 @@ class CodersTest(unittest.TestCase): coders.FastCoder, coders.ProtoCoder, coders.ToStringCoder, - coders.WindowCoder, coders.IntervalWindowCoder]) assert not standard - cls.seen, standard - cls.seen assert not standard - cls.seen_nested, standard - cls.seen_nested http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/core.py -- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 2d28eec..9f66c39 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1185,6 +1185,10 @@ class Windowing(object): else: raise ValueError( 'accumulation_mode must be provided for non-trivial triggers') +if not windowfn.get_window_coder().is_deterministic(): + raise ValueError( + 'window fn (%s) does not have a determanistic coder (%s)' % ( + window_fn, windowfn.get_window_coder())) self.windowfn = windowfn self.triggerfn = triggerfn self.accumulation_mode = accumulation_mode http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/window.py -- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 931a17d..643cb99 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -49,6 +49,8 @@ WindowFn. from __future__ import absolute_import +import abc + from google.protobuf import struct_pb2 from google.protobuf import wrappers_pb2 @@ -93,6 +95,8 @@ class OutputTimeFn(object): class WindowFn(object): """An abstract windowing function defining a basic assign and merge.""" + __metaclass__ = abc.ABCMeta + class AssignContext(object): """Context passed to WindowFn.assign().""" @@ -100,6 +104,7 @@ class WindowFn(object): self.timestamp = Timestamp.of(timestamp) self.element = element + @abc.abstractmethod def assign(self, assign_context): """Associates a timestamp to an element.""" raise NotImplementedError @@
[3/6] beam git commit: Remove obsolete and unused Runner.clear
Remove obsolete and unused Runner.clear Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4b9029ac Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4b9029ac Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4b9029ac Branch: refs/heads/master Commit: 4b9029ac3e965ed3629833138597f1fb365b0876 Parents: 68c0042 Author: Robert BradshawAuthored: Tue Apr 11 10:57:35 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:55:04 2017 -0700 -- sdks/python/apache_beam/runners/runner.py | 34 -- 1 file changed, 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/4b9029ac/sdks/python/apache_beam/runners/runner.py -- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index ccb066b..b35cb7b 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -99,8 +99,6 @@ class PipelineRunner(object): The base runner provides a run() method for visiting every node in the pipeline's DAG and executing the transforms computing the PValue in the node. - It also provides a clear() method for visiting every node and clearing out - the values contained in PValue objects produced during a run. A custom runner will typically provide implementations for some of the transform methods (ParDo, GroupByKey, Create, etc.). It may also @@ -129,38 +127,6 @@ class PipelineRunner(object): pipeline.visit(RunVisitor(self)) - def clear(self, pipeline, node=None): -"""Clear all nodes or nodes reachable from node of materialized values. - -Args: - pipeline: Pipeline object containing PValues to be cleared. - node: Optional node in the Pipeline processing DAG. If specified only -nodes reachable from this node will be cleared (ancestors of the node). - -This method is not intended (for now) to be called by users of Runner -objects. It is a hook for future layers on top of the current programming -model to control how much of the previously computed values are kept -around. Presumably an interactivity layer will use it. The simplest way -to change the behavior would be to define a runner that overwrites the -clear_pvalue() method since this method (runner.clear) will visit all -relevant nodes and call clear_pvalue on them. - -""" - -# Imported here to avoid circular dependencies. -# pylint: disable=wrong-import-order, wrong-import-position -from apache_beam.pipeline import PipelineVisitor - -class ClearVisitor(PipelineVisitor): - - def __init__(self, runner): -self.runner = runner - - def visit_value(self, value, _): -self.runner.clear_pvalue(value) - -pipeline.visit(ClearVisitor(self), node=node) - def apply(self, transform, input): """Runner callback for a pipeline.apply call.
[4/6] beam git commit: Enable IntervalWindowCoder test check.
Enable IntervalWindowCoder test check. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/68c00426 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/68c00426 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/68c00426 Branch: refs/heads/master Commit: 68c00426261c54a32be509f9ab53cf3543d49d78 Parents: d98294c Author: Robert BradshawAuthored: Tue Apr 11 10:52:44 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:55:04 2017 -0700 -- sdks/python/apache_beam/coders/coders_test_common.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/68c00426/sdks/python/apache_beam/coders/coders_test_common.py -- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index da0bde3..e5bfe35 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -61,8 +61,7 @@ class CodersTest(unittest.TestCase): standard -= set([coders.Coder, coders.FastCoder, coders.ProtoCoder, - coders.ToStringCoder, - coders.IntervalWindowCoder]) + coders.ToStringCoder]) assert not standard - cls.seen, standard - cls.seen assert not standard - cls.seen_nested, standard - cls.seen_nested @@ -171,6 +170,9 @@ class CodersTest(unittest.TestCase): *[window.IntervalWindow(x, y) for x in [-2**52, 0, 2**52] for y in range(-100, 100)]) +self.check_coder( +coders.TupleCoder((coders.IntervalWindowCoder(),)), +(window.IntervalWindow(0, 10),)) def test_timestamp_coder(self): self.check_coder(coders.TimestampCoder(),
[5/6] beam git commit: Rename AfterFirst to AfterAny for consistency with Java.
Rename AfterFirst to AfterAny for consistency with Java. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c347192 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c347192 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c347192 Branch: refs/heads/master Commit: 2c347192709e4076a1ac59b5ea769d4d3b2494f4 Parents: 4b9029a Author: Robert BradshawAuthored: Tue Apr 11 11:00:04 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:55:05 2017 -0700 -- sdks/python/apache_beam/transforms/trigger.py | 10 ++ sdks/python/apache_beam/transforms/trigger_test.py | 10 +- 2 files changed, 11 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/2c347192/sdks/python/apache_beam/transforms/trigger.py -- diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 6a4cf24..b9786f4 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -186,7 +186,7 @@ class TriggerFn(object): def from_runner_api(proto, context): return { 'after_all': AfterAll, -'after_any': AfterFirst, +'after_any': AfterAny, 'after_each': AfterEach, 'after_end_of_window': AfterWatermark, # after_processing_time, after_synchronized_processing_time @@ -488,7 +488,8 @@ class ParallelTriggerFn(TriggerFn): in proto.after_all.subtriggers or proto.after_any.subtriggers] if proto.after_all.subtriggers: return AfterAll(*subtriggers) -return AfterFirst(*subtriggers) +else: + return AfterAny(*subtriggers) def to_runner_api(self, context): subtriggers = [ @@ -505,7 +506,7 @@ class ParallelTriggerFn(TriggerFn): raise NotImplementedError(self) -class AfterFirst(ParallelTriggerFn): +class AfterAny(ParallelTriggerFn): """Fires when any subtrigger fires. Also finishes when any subtrigger finishes. @@ -589,7 +590,8 @@ class AfterEach(TriggerFn): for subtrigger in self.triggers])) -class OrFinally(AfterFirst): +class OrFinally(AfterAny): + @staticmethod def from_runner_api(proto, context): return OrFinally( http://git-wip-us.apache.org/repos/asf/beam/blob/2c347192/sdks/python/apache_beam/transforms/trigger_test.py -- diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 9f2046a..914babb 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -33,7 +33,7 @@ from apache_beam.transforms.trigger import AccumulationMode from apache_beam.transforms.trigger import AfterAll from apache_beam.transforms.trigger import AfterCount from apache_beam.transforms.trigger import AfterEach -from apache_beam.transforms.trigger import AfterFirst +from apache_beam.transforms.trigger import AfterAny from apache_beam.transforms.trigger import AfterWatermark from apache_beam.transforms.trigger import DefaultTrigger from apache_beam.transforms.trigger import GeneralTriggerDriver @@ -217,7 +217,7 @@ class TriggerTest(unittest.TestCase): def test_fixed_after_first(self): self.run_trigger_simple( FixedWindows(10), # pyformat break -AfterFirst(AfterCount(2), AfterWatermark()), +AfterAny(AfterCount(2), AfterWatermark()), AccumulationMode.ACCUMULATING, [(1, 'a'), (2, 'b'), (3, 'c')], {IntervalWindow(0, 10): [set('ab')]}, @@ -225,7 +225,7 @@ class TriggerTest(unittest.TestCase): 2) self.run_trigger_simple( FixedWindows(10), # pyformat break -AfterFirst(AfterCount(5), AfterWatermark()), +AfterAny(AfterCount(5), AfterWatermark()), AccumulationMode.ACCUMULATING, [(1, 'a'), (2, 'b'), (3, 'c')], {IntervalWindow(0, 10): [set('abc')]}, @@ -236,7 +236,7 @@ class TriggerTest(unittest.TestCase): def test_repeatedly_after_first(self): self.run_trigger_simple( FixedWindows(100), # pyformat break -Repeatedly(AfterFirst(AfterCount(3), AfterWatermark())), +Repeatedly(AfterAny(AfterCount(3), AfterWatermark())), AccumulationMode.ACCUMULATING, zip(range(7), 'abcdefg'), {IntervalWindow(0, 100): [ @@ -388,7 +388,7 @@ class RunnerApiTest(unittest.TestCase): for trigger_fn in ( DefaultTrigger(), AfterAll(AfterCount(1), AfterCount(10)), -AfterFirst(AfterCount(10), AfterCount(100)), +AfterAny(AfterCount(10), AfterCount(100)),
[1/2] beam git commit: [BEAM-662] Fix for allowing floating point periods in windows
Repository: beam Updated Branches: refs/heads/master 4e0c8333c -> 4e0d5f596 [BEAM-662] Fix for allowing floating point periods in windows Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc1bdd3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc1bdd3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc1bdd3 Branch: refs/heads/master Commit: 1bc1bdd33494b4123855e2e3c9fa823654b31998 Parents: 4e0c833 Author: Sourabh BajajAuthored: Wed Apr 19 18:20:11 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:53:55 2017 -0700 -- sdks/python/apache_beam/transforms/window.py | 10 ++ sdks/python/apache_beam/transforms/window_test.py | 14 ++ sdks/python/apache_beam/utils/timestamp.py| 4 3 files changed, 20 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/transforms/window.py -- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 319a7b4..931a17d 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -388,13 +388,15 @@ class SlidingWindows(NonMergingWindowFn): raise ValueError('The size parameter must be strictly positive.') self.size = Duration.of(size) self.period = Duration.of(period) -self.offset = Timestamp.of(offset) % size +self.offset = Timestamp.of(offset) % period def assign(self, context): timestamp = context.timestamp -start = timestamp - (timestamp - self.offset) % self.period -return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size) -for s in range(start, start - self.size, -self.period)] +start = timestamp - ((timestamp - self.offset) % self.period) +return [ +IntervalWindow(Timestamp(micros=s), Timestamp(micros=s) + self.size) +for s in range(start.micros, timestamp.micros - self.size.micros, + -self.period.micros)] def __eq__(self, other): if type(self) == type(other) == SlidingWindows: http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/transforms/window_test.py -- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 1ac95e4..cbfd0b2 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -108,6 +108,20 @@ class WindowTest(unittest.TestCase): self.assertEqual(expected, windowfn.assign(context('v', 8))) self.assertEqual(expected, windowfn.assign(context('v', 11))) + def test_sliding_windows_assignment_fraction(self): +windowfn = SlidingWindows(size=3.5, period=2.5, offset=1.5) +self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)], + windowfn.assign(context('v', 1.7))) +self.assertEqual([IntervalWindow(1.5, 5.0)], + windowfn.assign(context('v', 3))) + + def test_sliding_windows_assignment_fraction_large_offset(self): +windowfn = SlidingWindows(size=3.5, period=2.5, offset=4.0) +self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)], + windowfn.assign(context('v', 1.7))) +self.assertEqual([IntervalWindow(4.0, 7.5), IntervalWindow(1.5, 5.0)], + windowfn.assign(context('v', 4.5))) + def test_sessions_merging(self): windowfn = Sessions(10) http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/utils/timestamp.py -- diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py index 647f4bd..8b2ccda 100644 --- a/sdks/python/apache_beam/utils/timestamp.py +++ b/sdks/python/apache_beam/utils/timestamp.py @@ -167,10 +167,6 @@ class Duration(object): # Note that the returned value may have lost precision. return float(self.micros) / 100 - def __int__(self): -# Note that the returned value may have lost precision. -return self.micros / 100 - def __cmp__(self, other): # Allow comparisons between Duration and Timestamp values. if not isinstance(other, Timestamp):
[2/2] beam git commit: Closes #2600
Closes #2600 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e0d5f59 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e0d5f59 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e0d5f59 Branch: refs/heads/master Commit: 4e0d5f59691721b3b3b9766a294c1d6af5589e43 Parents: 4e0c833 1bc1bdd Author: Robert BradshawAuthored: Thu Apr 20 08:53:56 2017 -0700 Committer: Robert Bradshaw Committed: Thu Apr 20 08:53:56 2017 -0700 -- sdks/python/apache_beam/transforms/window.py | 10 ++ sdks/python/apache_beam/transforms/window_test.py | 14 ++ sdks/python/apache_beam/utils/timestamp.py| 4 3 files changed, 20 insertions(+), 8 deletions(-) --
[GitHub] beam pull request #2595: [Beam-115] More complete translation of the graph t...
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2595 [Beam-115] More complete translation of the graph through the Runner API Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam py-runner-api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2595.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2595 commit d5b5c5ca23fce24e3c7028836641737609274596 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-18T22:29:04Z Per-transform runner api dispatch. commit dcf2bda4cbad8d2f1bbaa9f650fe59cd40522b5e Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-18T22:51:50Z Translate flatten to Runner API. commit d8d96772a60d53c4ffdaa480fd0bee575c8ae4df Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-18T23:07:32Z Translate WindowInto through the Runner API. commit 41a7bb060ecc46182252469fe2c055562a72a6a0 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-19T00:01:53Z Translate Reads through the Runner API. commit 05da3fd47701434f004a9d7ad8c5b80261d3c11d Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-19T05:46:25Z Factor out common URN registration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2586: Remove vestigial Read and Write from core.py
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2586 Remove vestigial Read and Write from core.py Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam patch-7 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2586.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2586 commit e63aa0a104b89f0d0e24859227611e8f3235d559 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-18T23:23:51Z Remove vestigial Read and Write from core.py --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2552: Various cleanups
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2552 Various cleanups I accumulated these while working on other code. You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam cleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2552.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2552 commit 2fb4ce5798e8ad8f6255101a232dda5894dd9489 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-11T17:29:15Z Make stage names consistent. commit c56afa5854b62f081f1a132f7b7acd30ae8ff3b3 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-11T17:49:13Z Require deterministic window coders. commit 0fb2cabdb71eb5cf7d3ee42995906658eaf8e5d0 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-11T17:52:44Z Enable IntervalWindowCoder test check. commit 5a70fb678828db3687ce7ec0b1eedee5a2bce8b0 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-11T17:57:35Z Remove obsolete and unused Runner.clear commit 5629453d243c39ead63da5b9b75c122db660474b Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-11T18:00:04Z Rename AfterFirst to AfterAny for consistency with Java. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] beam git commit: Ignore more python build artifacts.
Repository: beam Updated Branches: refs/heads/master 77712c936 -> a5a5bf946 Ignore more python build artifacts. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a1683578 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a1683578 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a1683578 Branch: refs/heads/master Commit: a168357885b17bde688a548ed1fa12231a49658d Parents: 77712c9 Author: Robert BradshawAuthored: Tue Apr 11 11:49:16 2017 -0700 Committer: Robert Bradshaw Committed: Tue Apr 11 11:49:16 2017 -0700 -- .gitignore | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/a1683578/.gitignore -- diff --git a/.gitignore b/.gitignore index bc9f675..69946a9 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,9 @@ build/ dist/ distribute-* env/ +sdks/python/**/*.c +sdks/python/**/*.so +sdks/python/**/*.egg # Ignore IntelliJ files. .idea/
[2/2] beam git commit: Closes #2494
Closes #2494 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5a5bf94 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5a5bf94 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5a5bf94 Branch: refs/heads/master Commit: a5a5bf946f69d75897279f66aeeb0dcea3babfb2 Parents: 77712c9 a168357 Author: Robert BradshawAuthored: Tue Apr 11 12:12:52 2017 -0700 Committer: Robert Bradshaw Committed: Tue Apr 11 12:12:52 2017 -0700 -- .gitignore | 3 +++ 1 file changed, 3 insertions(+) --
[GitHub] beam pull request #2494: Ignore more python build artifacts.
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2494 Ignore more python build artifacts. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam git-ignore Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2494.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2494 commit a168357885b17bde688a548ed1fa12231a49658d Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-11T18:49:16Z Ignore more python build artifacts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2437: Use absolute import for dataflow iobase test.
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2437 Use absolute import for dataflow iobase test. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam patch-6 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2437.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2437 commit 1ae5fee58bc494bc072cde5d6e6cc7f7cda69e4b Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-05T18:55:20Z Use absolute import for dataflow iobase test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] beam git commit: Closes #2435
Repository: beam Updated Branches: refs/heads/master bc907c58b -> e32a0252f Closes #2435 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e32a0252 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e32a0252 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e32a0252 Branch: refs/heads/master Commit: e32a0252fca4ac7011b555d1aa14b1e098ac52a5 Parents: bc907c5 3eef246 Author: Robert BradshawAuthored: Wed Apr 5 10:46:29 2017 -0700 Committer: Robert Bradshaw Committed: Wed Apr 5 10:46:29 2017 -0700 -- sdks/python/apache_beam/examples/wordcount.py | 32 ++-- .../apache_beam/internal/gcp/json_value.py | 6 - sdks/python/apache_beam/io/filebasedsource.py | 54 ++ .../apache_beam/io/filebasedsource_test.py | 24 --- sdks/python/apache_beam/io/fileio.py| 56 ++- sdks/python/apache_beam/io/fileio_test.py | 45 ++--- .../runners/dataflow/internal/apiclient.py | 1 - .../apache_beam/runners/direct/direct_runner.py | 9 - sdks/python/apache_beam/transforms/display.py | 1 - .../apache_beam/transforms/display_test.py | 36 .../apache_beam/utils/pipeline_options.py | 92 +-- .../apache_beam/utils/pipeline_options_test.py | 52 +- sdks/python/apache_beam/utils/value_provider.py | 110 - .../apache_beam/utils/value_provider_test.py| 165 --- 14 files changed, 65 insertions(+), 618 deletions(-) --
[2/2] beam git commit: Revert "Add ValueProvider class for FileBasedSource I/O Transforms"
Revert "Add ValueProvider class for FileBasedSource I/O Transforms" This reverts commit 1e2168a127fb3047fb15d231a001bbf951892e11. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3eef246f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3eef246f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3eef246f Branch: refs/heads/master Commit: 3eef246f761f92c626541e9008f8624b43bdcc09 Parents: bc907c5 Author: Ahmet AltayAuthored: Wed Apr 5 10:03:14 2017 -0700 Committer: Robert Bradshaw Committed: Wed Apr 5 10:46:29 2017 -0700 -- sdks/python/apache_beam/examples/wordcount.py | 32 ++-- .../apache_beam/internal/gcp/json_value.py | 6 - sdks/python/apache_beam/io/filebasedsource.py | 54 ++ .../apache_beam/io/filebasedsource_test.py | 24 --- sdks/python/apache_beam/io/fileio.py| 56 ++- sdks/python/apache_beam/io/fileio_test.py | 45 ++--- .../runners/dataflow/internal/apiclient.py | 1 - .../apache_beam/runners/direct/direct_runner.py | 9 - sdks/python/apache_beam/transforms/display.py | 1 - .../apache_beam/transforms/display_test.py | 36 .../apache_beam/utils/pipeline_options.py | 92 +-- .../apache_beam/utils/pipeline_options_test.py | 52 +- sdks/python/apache_beam/utils/value_provider.py | 110 - .../apache_beam/utils/value_provider_test.py| 165 --- 14 files changed, 65 insertions(+), 618 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/3eef246f/sdks/python/apache_beam/examples/wordcount.py -- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 27b9dcb..50c0328 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -19,6 +19,7 @@ from __future__ import absolute_import +import argparse import logging import re @@ -66,29 +67,24 @@ class WordExtractingDoFn(beam.DoFn): def run(argv=None): """Main entry point; defines and runs the wordcount pipeline.""" - class WordcountOptions(PipelineOptions): -@classmethod -def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--input', - dest='input', - default='gs://dataflow-samples/shakespeare/kinglear.txt', - help='Input file to process.') - parser.add_value_provider_argument( - '--output', - dest='output', - required=True, - help='Output file to write results to.') - pipeline_options = PipelineOptions(argv) - wordcount_options = pipeline_options.view_as(WordcountOptions) - + parser = argparse.ArgumentParser() + parser.add_argument('--input', + dest='input', + default='gs://dataflow-samples/shakespeare/kinglear.txt', + help='Input file to process.') + parser.add_argument('--output', + dest='output', + required=True, + help='Output file to write results to.') + known_args, pipeline_args = parser.parse_known_args(argv) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True p = beam.Pipeline(options=pipeline_options) # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> ReadFromText(wordcount_options.input) + lines = p | 'read' >> ReadFromText(known_args.input) # Count the occurrences of each word. counts = (lines @@ -103,7 +99,7 @@ def run(argv=None): # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - output | 'write' >> WriteToText(wordcount_options.output) + output | 'write' >> WriteToText(known_args.output) # Actually run the pipeline (all operations above are deferred). result = p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/3eef246f/sdks/python/apache_beam/internal/gcp/json_value.py -- diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py index 4099c1a..c8b5393 100644 --- a/sdks/python/apache_beam/internal/gcp/json_value.py +++ b/sdks/python/apache_beam/internal/gcp/json_value.py @@ -25,8 +25,6 @@ except ImportError: extra_types = None # pylint: enable=wrong-import-order, wrong-import-position -from apache_beam.utils.value_provider import
[1/2] beam git commit: Closes #2432
Repository: beam Updated Branches: refs/heads/master e2a2836ad -> 8e5cfdea9 Closes #2432 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8e5cfdea Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8e5cfdea Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8e5cfdea Branch: refs/heads/master Commit: 8e5cfdea9d331b2bfac0dd054bafc7f3f5295b48 Parents: e2a2836 5436081 Author: Robert BradshawAuthored: Wed Apr 5 09:54:48 2017 -0700 Committer: Robert Bradshaw Committed: Wed Apr 5 09:54:48 2017 -0700 -- sdks/python/apache_beam/pipeline.py | 1 + sdks/python/apache_beam/transforms/ptransform_test.py | 6 ++ 2 files changed, 7 insertions(+) --
[2/2] beam git commit: Update refcounts after pipeline reconstruction.
Update refcounts after pipeline reconstruction. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54360814 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54360814 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54360814 Branch: refs/heads/master Commit: 54360814fbc59f69e10f4bac52cfeeea5e044cf9 Parents: e2a2836 Author: Robert BradshawAuthored: Wed Apr 5 08:37:33 2017 -0700 Committer: Robert Bradshaw Committed: Wed Apr 5 09:54:48 2017 -0700 -- sdks/python/apache_beam/pipeline.py | 1 + sdks/python/apache_beam/transforms/ptransform_test.py | 6 ++ 2 files changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/54360814/sdks/python/apache_beam/pipeline.py -- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 8506b85..fdb9a9d 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -546,4 +546,5 @@ class AppliedPTransform(object): if pc not in result.inputs: pc.producer = result pc.tag = tag +result.update_input_refcounts() return result http://git-wip-us.apache.org/repos/asf/beam/blob/54360814/sdks/python/apache_beam/transforms/ptransform_test.py -- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 37ff2a8..5889ab5 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -435,6 +435,12 @@ class PTransformTest(unittest.TestCase): assert_that(result, equal_to([])) pipeline.run() + def test_flatten_same_pcollections(self): +pipeline = TestPipeline() +pc = pipeline | beam.Create(['a', 'b']) +assert_that((pc, pc, pc) | beam.Flatten(), equal_to(['a', 'b'] * 3)) +pipeline.run() + def test_flatten_pcollections_in_iterable(self): pipeline = TestPipeline() pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3])
[GitHub] beam pull request #2432: Update refcounts after pipeline reconstruction.
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2432 Update refcounts after pipeline reconstruction. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam flatten Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2432.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2432 commit 6a337db9264b9179d0c77aff4d94e89267be838d Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-05T15:37:33Z Update refcounts after pipeline reconstruction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: Closes #2400
Closes #2400 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50fc63a9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50fc63a9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50fc63a9 Branch: refs/heads/master Commit: 50fc63a9b52c9c6f20082ed68ee3dc47966e0d32 Parents: 22ff898 a41 Author: Robert BradshawAuthored: Tue Apr 4 11:26:54 2017 -0700 Committer: Robert Bradshaw Committed: Tue Apr 4 11:26:54 2017 -0700 -- sdks/python/apache_beam/pipeline.py | 5 + 1 file changed, 5 insertions(+) --
[1/2] beam git commit: Avoid Runner API translation of pipelines with PDone.
Repository: beam Updated Branches: refs/heads/master 22ff898a3 -> 50fc63a9b Avoid Runner API translation of pipelines with PDone. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a419 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a419 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a419 Branch: refs/heads/master Commit: a419a8e58cafced10232ff56094a58fe17ce Parents: 22ff898 Author: Robert BradshawAuthored: Sat Apr 1 12:42:40 2017 -0700 Committer: Robert Bradshaw Committed: Tue Apr 4 11:26:53 2017 -0700 -- sdks/python/apache_beam/pipeline.py | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/a419/sdks/python/apache_beam/pipeline.py -- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 3c416eb..8506b85 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -318,6 +318,11 @@ class Pipeline(object): pickler.loads(pickler.dumps(transform_node.transform)) except Exception: Visitor.ok = False + + def visit_value(self, value, _): +if isinstance(value, pvalue.PDone): + Visitor.ok = False + self.visit(Visitor()) return Visitor.ok
[GitHub] beam pull request #2400: [BEAM-1843] Avoid Runner API translation of pipelin...
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2400 [BEAM-1843] Avoid Runner API translation of pipelines with PDone. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam pdone Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2400.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2400 commit c4ac34dc40387b752c73a7540c09cda2ceff6833 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-04-01T19:42:40Z Avoid Runner API translation of pipelines with PDone. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: Fix side inputs on dataflow runner.
Fix side inputs on dataflow runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/07daf3a5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/07daf3a5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/07daf3a5 Branch: refs/heads/master Commit: 07daf3a54544ce842165ffe15264e43ebced28ba Parents: 60901f8 Author: Robert BradshawAuthored: Fri Mar 31 14:57:44 2017 -0700 Committer: Robert Bradshaw Committed: Fri Mar 31 21:45:16 2017 -0700 -- .../apache_beam/runners/dataflow/dataflow_runner.py| 13 +++-- 1 file changed, 7 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/07daf3a5/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py -- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index db433df..fe9f8c0 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -287,7 +287,7 @@ class DataflowRunner(PipelineRunner): def _add_singleton_step(self, label, full_label, tag, input_step): """Creates a CollectionToSingleton step used to handle ParDo side inputs.""" # Import here to avoid adding the dependency for local running scenarios. -from google.cloud.dataflow.internal import apiclient +from apache_beam.runners.dataflow.internal import apiclient step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label) self.job.proto.steps.append(step.proto) step.add_property(PropertyNames.USER_NAME, full_label) @@ -302,7 +302,7 @@ class DataflowRunner(PipelineRunner): [{PropertyNames.USER_NAME: ( '%s.%s' % (full_label, PropertyNames.OUTPUT)), PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUTPUT}]) + PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) return step def run_Flatten(self, transform_node): @@ -374,12 +374,10 @@ class DataflowRunner(PipelineRunner): si_dict = {} # We must call self._cache.get_pvalue exactly once due to refcounting. si_labels = {} -for side_pval in transform_node.side_inputs: - si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name lookup_label = lambda side_pval: si_labels[side_pval] for side_pval in transform_node.side_inputs: assert isinstance(side_pval, AsSideInput) - si_label = self._get_unique_step_name() + si_label = 'SideInput-' + self._get_unique_step_name() si_full_label = '%s/%s' % (transform_node.full_label, si_label) self._add_singleton_step( si_label, si_full_label, side_pval.pvalue.tag, @@ -388,10 +386,13 @@ class DataflowRunner(PipelineRunner): '@type': 'OutputReference', PropertyNames.STEP_NAME: si_label, PropertyNames.OUTPUT_NAME: PropertyNames.OUT} + si_labels[side_pval] = si_label # Now create the step for the ParDo transform being handled. step = self._add_step( -TransformNames.DO, transform_node.full_label, transform_node, +TransformNames.DO, +transform_node.full_label + '/Do' if transform_node.side_inputs else '', +transform_node, transform_node.transform.side_output_tags) fn_data = self._pardo_fn_data(transform_node, lookup_label) step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data))
[1/2] beam git commit: Closes #2395
Repository: beam Updated Branches: refs/heads/master 60901f876 -> 03dce6dcc Closes #2395 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03dce6dc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03dce6dc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03dce6dc Branch: refs/heads/master Commit: 03dce6dccef50210dd420cb9f79baf063bc0b0e8 Parents: 60901f8 07daf3a Author: Robert BradshawAuthored: Fri Mar 31 21:45:16 2017 -0700 Committer: Robert Bradshaw Committed: Fri Mar 31 21:45:16 2017 -0700 -- .../apache_beam/runners/dataflow/dataflow_runner.py| 13 +++-- 1 file changed, 7 insertions(+), 6 deletions(-) --
[2/3] beam git commit: Only encode PCollection outputs in Runner API protos.
Only encode PCollection outputs in Runner API protos. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9ff44af Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9ff44af Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9ff44af Branch: refs/heads/master Commit: c9ff44afa3fcb47b7f0c4288f4f7d520f063d442 Parents: 0749982 Author: Robert BradshawAuthored: Fri Mar 31 16:57:01 2017 -0700 Committer: Robert Bradshaw Committed: Fri Mar 31 21:44:21 2017 -0700 -- sdks/python/apache_beam/pipeline.py | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/c9ff44af/sdks/python/apache_beam/pipeline.py -- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index ee5904b..0841e5f 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -494,6 +494,10 @@ class AppliedPTransform(object): return {str(ix): input for ix, input in enumerate(self.inputs) if isinstance(input, pvalue.PCollection)} + def named_outputs(self): +return {str(tag): output for tag, output in self.outputs.items() +if isinstance(output, pvalue.PCollection)} + def to_runner_api(self, context): from apache_beam.runners.api import beam_runner_api_pb2 return beam_runner_api_pb2.PTransform( @@ -507,7 +511,7 @@ class AppliedPTransform(object): inputs={tag: context.pcollections.get_id(pc) for tag, pc in self.named_inputs().items()}, outputs={str(tag): context.pcollections.get_id(out) - for tag, out in self.outputs.items()}, + for tag, out in self.named_outputs().items()}, # TODO(BEAM-115): display_data display_data=None)
[3/3] beam git commit: Closes #2396
Closes #2396 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/60901f87 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/60901f87 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/60901f87 Branch: refs/heads/master Commit: 60901f876aaf9e85c11501557516f9d47b87d994 Parents: 0749982 bd40c5a Author: Robert BradshawAuthored: Fri Mar 31 21:44:22 2017 -0700 Committer: Robert Bradshaw Committed: Fri Mar 31 21:44:22 2017 -0700 -- sdks/python/apache_beam/pipeline.py | 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) --
[1/3] beam git commit: Ensure transforms are picklable before materializing to protos.
Repository: beam Updated Branches: refs/heads/master 07499824b -> 60901f876 Ensure transforms are picklable before materializing to protos. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd40c5a2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd40c5a2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd40c5a2 Branch: refs/heads/master Commit: bd40c5a27e379914914a30f79854fd7a38621c66 Parents: c9ff44a Author: Robert BradshawAuthored: Fri Mar 31 16:57:22 2017 -0700 Committer: Robert Bradshaw Committed: Fri Mar 31 21:44:21 2017 -0700 -- sdks/python/apache_beam/pipeline.py | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/bd40c5a2/sdks/python/apache_beam/pipeline.py -- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 0841e5f..3c416eb 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -311,6 +311,12 @@ class Pipeline(object): def visit_transform(self, transform_node): if transform_node.side_inputs: + # No side inputs (yet). + Visitor.ok = False +try: + # Transforms must be picklable. + pickler.loads(pickler.dumps(transform_node.transform)) +except Exception: Visitor.ok = False self.visit(Visitor()) return Visitor.ok
[GitHub] beam pull request #2396: [BEAM-1843] Ensure transforms are picklable before ...
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2396 [BEAM-1843] Ensure transforms are picklable before materializing to protos. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam native-write Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2396.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2396 commit c6abd8ea8260ff51614d9b309975975adc4cf32e Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-31T23:30:56Z Ensure transforms are picklable before materializing to protos. See BEAM-1843. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2395: [BEAM-1853] Fix side inputs on dataflow runner.
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2395 [BEAM-1853] Fix side inputs on dataflow runner. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam side-inputs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2395.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2395 commit 3aa157a9be4f5bf8e390fcb3acf37d8bdb250954 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-31T21:57:44Z Fix side inputs on dataflow runner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] beam git commit: Closes #2388
Repository: beam Updated Branches: refs/heads/master 132d3c5f6 -> affb926cc Closes #2388 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/affb926c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/affb926c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/affb926c Branch: refs/heads/master Commit: affb926ccd9302734805d8c0db418d006ee37672 Parents: 132d3c5 207de81 Author: Robert BradshawAuthored: Fri Mar 31 12:11:00 2017 -0700 Committer: Robert Bradshaw Committed: Fri Mar 31 12:11:00 2017 -0700 -- sdks/python/apache_beam/pipeline.py | 12 +- sdks/python/apache_beam/pvalue.py | 258 +++ sdks/python/apache_beam/pvalue_test.py | 33 --- .../runners/dataflow/dataflow_runner.py | 29 ++- .../runners/direct/bundle_factory.py| 3 +- .../consumer_tracking_pipeline_visitor.py | 11 +- .../consumer_tracking_pipeline_visitor_test.py | 4 +- .../runners/direct/evaluation_context.py| 60 +++-- .../apache_beam/runners/direct/executor.py | 7 +- .../runners/direct/transform_evaluator.py | 51 +--- sdks/python/apache_beam/transforms/core.py | 2 +- .../python/apache_beam/transforms/ptransform.py | 4 +- .../python/apache_beam/transforms/sideinputs.py | 132 -- .../apache_beam/transforms/sideinputs_test.py | 91 +++ 14 files changed, 198 insertions(+), 499 deletions(-) --
[2/2] beam git commit: Change side inputs to be references rather than full PValues.
Change side inputs to be references rather than full PValues. This is more consistent with the Runner API's structure. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/207de81b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/207de81b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/207de81b Branch: refs/heads/master Commit: 207de81bca4c3761cf663d32f9b95a022ef97165 Parents: 132d3c5 Author: Robert Bradshaw <rober...@gmail.com> Authored: Thu Mar 30 08:20:21 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Fri Mar 31 12:11:00 2017 -0700 -- sdks/python/apache_beam/pipeline.py | 12 +- sdks/python/apache_beam/pvalue.py | 258 +++ sdks/python/apache_beam/pvalue_test.py | 33 --- .../runners/dataflow/dataflow_runner.py | 29 ++- .../runners/direct/bundle_factory.py| 3 +- .../consumer_tracking_pipeline_visitor.py | 11 +- .../consumer_tracking_pipeline_visitor_test.py | 4 +- .../runners/direct/evaluation_context.py| 60 +++-- .../apache_beam/runners/direct/executor.py | 7 +- .../runners/direct/transform_evaluator.py | 51 +--- sdks/python/apache_beam/transforms/core.py | 2 +- .../python/apache_beam/transforms/ptransform.py | 4 +- .../python/apache_beam/transforms/sideinputs.py | 132 -- .../apache_beam/transforms/sideinputs_test.py | 91 +++ 14 files changed, 198 insertions(+), 499 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pipeline.py -- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index be2a79d..ee5904b 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -142,9 +142,6 @@ class Pipeline(object): # If a transform is applied and the full label is already in the set # then the transform will have to be cloned with a new label. self.applied_labels = set() -# Store cache of views created from PCollections. For reference, see -# pvalue._cache_view(). -self._view_cache = {} def _current_transform(self): """Returns the transform currently on the top of the stack.""" @@ -271,8 +268,8 @@ class Pipeline(object): result.producer = current # TODO(robertwb): Multi-input, multi-output inference. # TODO(robertwb): Ideally we'd do intersection here. - if (type_options is not None and type_options.pipeline_type_check and - isinstance(result, (pvalue.PCollection, pvalue.PCollectionView)) + if (type_options is not None and type_options.pipeline_type_check + and isinstance(result, pvalue.PCollection) and not result.element_type): input_element_type = ( inputs[0].element_type @@ -416,7 +413,7 @@ class AppliedPTransform(object): if not isinstance(main_input, pvalue.PBegin): real_producer(main_input).refcounts[main_input.tag] += 1 for side_input in self.side_inputs: -real_producer(side_input).refcounts[side_input.tag] += 1 +real_producer(side_input.pvalue).refcounts[side_input.pvalue.tag] += 1 def add_output(self, output, tag=None): if isinstance(output, pvalue.DoOutputsTuple): @@ -456,7 +453,8 @@ class AppliedPTransform(object): # Visit side inputs. for pval in self.side_inputs: - if isinstance(pval, pvalue.PCollectionView) and pval not in visited: + if isinstance(pval, pvalue.AsSideInput) and pval.pvalue not in visited: +pval = pval.pvalue # Unpack marker-object-wrapped pvalue. assert pval.producer is not None pval.producer.visit(visitor, pipeline, visited) # The value should be visited now since we visit outputs too. http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pvalue.py -- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 4114b3f..bfe1745 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -26,9 +26,10 @@ produced when the pipeline gets executed. from __future__ import absolute_import -import collections import itertools +from apache_beam import typehints + class PValue(object): """Base class for PCollection. @@ -250,20 +251,22 @@ class SideOutputValue(object): self.value = value -class PCollectionView(PValue): - """An immutable view of a PCollection that can be used as a side input.""" +class AsSideInput(object): + ""&qu
[GitHub] beam pull request #2388: Change side inputs to be references rather than ful...
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2388 Change side inputs to be references rather than full PValues. This is more consistent with the Runner API's structure. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam side-inputs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2388.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2388 commit 85c60920325b8c06f0120fa8be2a2020d4cbdff4 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-30T15:20:21Z Change side inputs to be references rather than full PValues. This is more consistent with the Runner API's structure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] beam git commit: Translate pipeline graph to and from Runner API protos.
lf.options.view_as(SetupOptions).save_main_session: # If this option is chosen, verify we can pickle the main session early. tmpdir = tempfile.mkdtemp() @@ -299,6 +308,42 @@ class Pipeline(object): self.transforms_stack.pop() return pvalueish_result + def _verify_runner_api_compatible(self): +class Visitor(PipelineVisitor): # pylint: disable=used-before-assignment + ok = True # Really a nonlocal. + + def visit_transform(self, transform_node): +if transform_node.side_inputs: + Visitor.ok = False +self.visit(Visitor()) +return Visitor.ok + + def to_runner_api(self): +from apache_beam.runners import pipeline_context +from apache_beam.runners.api import beam_runner_api_pb2 +context = pipeline_context.PipelineContext() +# Mutates context; placing inline would force dependence on +# argument evaluation order. +root_transform_id = context.transforms.get_id(self._root_transform()) +proto = beam_runner_api_pb2.Pipeline( +root_transform_id=root_transform_id, +components=context.to_runner_api()) +return proto + + @staticmethod + def from_runner_api(proto, runner, options): +p = Pipeline(runner=runner, options=options) +from apache_beam.runners import pipeline_context +context = pipeline_context.PipelineContext(proto.components) +p.transforms_stack = [ +context.transforms.get_by_id(proto.root_transform_id)] +# TODO(robertwb): These are only needed to continue construction. Omit? +p.applied_labels = set([ +t.unique_name for t in proto.components.transforms.values()]) +for id in proto.components.pcollections: + context.pcollections.get_by_id(id).pipeline = p +return p + class PipelineVisitor(object): """Visitor pattern class used to traverse a DAG of transforms. @@ -374,12 +419,16 @@ class AppliedPTransform(object): real_producer(side_input).refcounts[side_input.tag] += 1 def add_output(self, output, tag=None): -assert (isinstance(output, pvalue.PValue) or -isinstance(output, pvalue.DoOutputsTuple)) -if tag is None: - tag = len(self.outputs) -assert tag not in self.outputs -self.outputs[tag] = output +if isinstance(output, pvalue.DoOutputsTuple): + self.add_output(output[output._main_tag]) +elif isinstance(output, pvalue.PValue): + # TODO(BEAM-1833): Require tags when calling this method. + if tag is None and None in self.outputs: +tag = len(self.outputs) + assert tag not in self.outputs + self.outputs[tag] = output +else: + raise TypeError("Unexpected output type: %s" % output) def add_part(self, part): assert isinstance(part, AppliedPTransform) @@ -441,3 +490,47 @@ class AppliedPTransform(object): if v not in visited: visited.add(v) visitor.visit_value(v, self) + + def named_inputs(self): +# TODO(BEAM-1833): Push names up into the sdk construction. +return {str(ix): input for ix, input in enumerate(self.inputs) +if isinstance(input, pvalue.PCollection)} + + def to_runner_api(self, context): +from apache_beam.runners.api import beam_runner_api_pb2 +return beam_runner_api_pb2.PTransform( +unique_name=self.full_label, +spec=beam_runner_api_pb2.UrnWithParameter( +urn=urns.PICKLED_TRANSFORM, +parameter=proto_utils.pack_Any( +wrappers_pb2.BytesValue(value=pickler.dumps(self.transform, +subtransforms=[context.transforms.get_id(part) for part in self.parts], +# TODO(BEAM-115): Side inputs. +inputs={tag: context.pcollections.get_id(pc) +for tag, pc in self.named_inputs().items()}, +outputs={str(tag): context.pcollections.get_id(out) + for tag, out in self.outputs.items()}, +# TODO(BEAM-115): display_data +display_data=None) + + @staticmethod + def from_runner_api(proto, context): +result = AppliedPTransform( +parent=None, +transform=pickler.loads( +proto_utils.unpack_Any(proto.spec.parameter, + wrappers_pb2.BytesValue).value), +full_label=proto.unique_name, +inputs=[ +context.pcollections.get_by_id(id) for id in proto.inputs.values()]) +result.parts = [ +context.transforms.get_by_id(id) for id in proto.subtransforms] +result.outputs = { +None if tag == 'None' else tag: context.pcollections.get_by_id(id) +for tag, id in proto.outputs.items()} +if not result.parts: + for tag, pc in result.outputs.items(): +if pc not in result.inputs: + pc.producer = result + pc.tag = tag +return result http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pipeline_test.py -- diff --git
[GitHub] beam pull request #2318: Add link to contributions guides to github README
Github user robertwb closed the pull request at: https://github.com/apache/beam/pull/2318 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] beam git commit: Allow customizing the GCE metadata service URL.
Repository: beam Updated Branches: refs/heads/master 9b8e16fca -> c80ef1837 Allow customizing the GCE metadata service URL. The goal here is to allow a user to customize where a job finds the metadata service; it would also be possible to do this programmatically (eg expose a variable), but making it an environment variable allows a caller to do this without need to do so in-process. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7453766b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7453766b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7453766b Branch: refs/heads/master Commit: 7453766b0045fabcb8e6e683b30f7fa1a20b334c Parents: 9b8e16f Author: Craig CitroAuthored: Fri Mar 24 13:38:50 2017 -0700 Committer: Robert Bradshaw Committed: Fri Mar 24 14:47:19 2017 -0700 -- sdks/python/apache_beam/internal/gcp/auth.py | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/7453766b/sdks/python/apache_beam/internal/gcp/auth.py -- diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index ccc67c6..8304658 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -86,9 +86,11 @@ class GCEMetadataCredentials(OAuth2Credentials): retry_filter=retry.retry_on_server_errors_and_timeout_filter) def _refresh(self, http_request): refresh_time = datetime.datetime.now() -req = urllib2.Request('http://metadata.google.internal/computeMetadata/v1/' - 'instance/service-accounts/default/token', - headers={'Metadata-Flavor': 'Google'}) +metadata_root = os.environ.get( +'GCE_METADATA_ROOT', 'metadata.google.internal') +token_url = ('http://{}/computeMetadata/v1/instance/service-accounts/' + 'default/token').format(metadata_root) +req = urllib2.Request(token_url, headers={'Metadata-Flavor': 'Google'}) token_data = json.loads(urllib2.urlopen(req).read()) self.access_token = token_data['access_token'] self.token_expiry = (refresh_time +
[2/2] beam git commit: Closes #2319
Closes #2319 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c80ef183 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c80ef183 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c80ef183 Branch: refs/heads/master Commit: c80ef18376cb8c5b2cf8c1d9ac6000386d1eceb2 Parents: 9b8e16f 7453766 Author: Robert BradshawAuthored: Fri Mar 24 14:47:20 2017 -0700 Committer: Robert Bradshaw Committed: Fri Mar 24 14:47:20 2017 -0700 -- sdks/python/apache_beam/internal/gcp/auth.py | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) --
[GitHub] beam pull request #2318: Add link to contributions guides to github README
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2318 Add link to contributions guides to github README Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam patch-5 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2318.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2318 commit 950c0a36bc7e8046e67ee22089af902e40f24f4a Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-24T20:36:36Z Add link to contributions guides to github README --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2296: [BEAM-115] Translate pipeline graph to and from Run...
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2296 [BEAM-115] Translate pipeline graph to and from Runner API protos. There are some caveates: * Specific known transforms, with their payloads, are not yet translated. * Side inputs are not yet supported. All pipelines without side inputs are passed through this translation by default before execution. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam py-runner-api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2296.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2296 commit ebc74e9678050c632574b11afebf250b8332c25e Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-20T23:08:37Z Translate pipeline graph to and from Runner API protos. There are some caveates: * Specific known transforms, with their payloads, are not yet translated. * Side inputs are not yet supported. All pipelines without side inputs are passed through this translation by default before execution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam-site pull request #176: Update quickstart-java.md
Github user robertwb closed the pull request at: https://github.com/apache/beam-site/pull/176 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/3] beam git commit: Fix bad test obscured by falsely passing assert_that.
Fix bad test obscured by falsely passing assert_that. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47da10cf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47da10cf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47da10cf Branch: refs/heads/master Commit: 47da10cfc21e3344bd198e572528a02fcf1caec3 Parents: c39b02d Author: Robert BradshawAuthored: Tue Mar 21 00:41:06 2017 -0700 Committer: Robert Bradshaw Committed: Tue Mar 21 10:38:37 2017 -0700 -- sdks/python/apache_beam/transforms/window_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/47da10cf/sdks/python/apache_beam/transforms/window_test.py -- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 99be02c..11c8a68 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -142,7 +142,7 @@ class WindowTest(unittest.TestCase): def timestamped_key_values(self, pipeline, key, *timestamps): return (pipeline | 'start' >> Create(timestamps) -| Map(lambda x: WindowedValue((key, x), x, []))) +| Map(lambda x: WindowedValue((key, x), x, [GlobalWindow()]))) def test_sliding_windows(self): p = TestPipeline()
[1/3] beam git commit: Closes #2279
Repository: beam Updated Branches: refs/heads/master 030528f3d -> bea4f5aec Closes #2279 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bea4f5ae Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bea4f5ae Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bea4f5ae Branch: refs/heads/master Commit: bea4f5aecce80c398e6a73f62263b454642826f1 Parents: 030528f 47da10c Author: Robert BradshawAuthored: Tue Mar 21 10:38:37 2017 -0700 Committer: Robert Bradshaw Committed: Tue Mar 21 10:38:37 2017 -0700 -- sdks/python/apache_beam/transforms/util.py | 18 --- sdks/python/apache_beam/transforms/util_test.py | 50 .../apache_beam/transforms/window_test.py | 2 +- 3 files changed, 63 insertions(+), 7 deletions(-) --
[3/3] beam git commit: [BEAM-1768] Fix assert_that for empty inputs
[BEAM-1768] Fix assert_that for empty inputs Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c39b02d9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c39b02d9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c39b02d9 Branch: refs/heads/master Commit: c39b02d9ea2d343f348beca1228776204605fb93 Parents: 030528f Author: Robert BradshawAuthored: Mon Mar 20 18:20:01 2017 -0700 Committer: Robert Bradshaw Committed: Tue Mar 21 10:38:37 2017 -0700 -- sdks/python/apache_beam/transforms/util.py | 18 --- sdks/python/apache_beam/transforms/util_test.py | 50 2 files changed, 62 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/c39b02d9/sdks/python/apache_beam/transforms/util.py -- diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index e3f5b85..ac7eb3c 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -22,6 +22,7 @@ from __future__ import absolute_import from apache_beam.transforms import window from apache_beam.transforms.core import CombinePerKey +from apache_beam.transforms.core import Create from apache_beam.transforms.core import Flatten from apache_beam.transforms.core import GroupByKey from apache_beam.transforms.core import Map @@ -222,12 +223,17 @@ def assert_that(actual, matcher, label='assert_that'): class AssertThat(PTransform): def expand(self, pcoll): - return (pcoll - | WindowInto(window.GlobalWindows()) - | "ToVoidKey" >> Map(lambda v: (None, v)) - | "Group" >> GroupByKey() - | "UnKey" >> Map(lambda (k, v): v) - | "Match" >> Map(matcher)) + # We must have at least a single element to ensure the matcher + # code gets run even if the input pcollection is empty. + keyed_singleton = pcoll.pipeline | Create([(None, None)]) + keyed_actual = ( + pcoll + | WindowInto(window.GlobalWindows()) + | "ToVoidKey" >> Map(lambda v: (None, v))) + _ = ((keyed_singleton, keyed_actual) + | "Group" >> CoGroupByKey() + | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values) + | "Match" >> Map(matcher)) def default_label(self): return label http://git-wip-us.apache.org/repos/asf/beam/blob/c39b02d9/sdks/python/apache_beam/transforms/util_test.py -- diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py new file mode 100644 index 000..9656827 --- /dev/null +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the util transforms.""" + +import unittest + +from apache_beam import Create +from apache_beam.test_pipeline import TestPipeline +from apache_beam.transforms.util import assert_that, equal_to, is_empty + + +class UtilTest(unittest.TestCase): + + def test_assert_that_passes(self): +with TestPipeline() as p: + assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3])) + + def test_assert_that_fails(self): +with self.assertRaises(Exception): + with TestPipeline() as p: +assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3])) + + def test_assert_that_fails_on_empty_input(self): +with self.assertRaises(Exception): + with TestPipeline() as p: +assert_that(p | Create([]), equal_to([1, 2, 3])) + + def test_assert_that_fails_on_empty_expected(self): +with self.assertRaises(Exception): + with TestPipeline() as p: +assert_that(p | Create([1, 2, 3]), is_empty()) + + +if __name__ == '__main__': + unittest.main()
[GitHub] beam pull request #2279: [BEAM-1768] Fix assert_that for empty inputs
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2279 [BEAM-1768] Fix assert_that for empty inputs Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam passert Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2279.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2279 commit f46ac4d1e92d2080a97e0f29d084816f1b97af54 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-21T01:20:01Z [BEAM-1768] Fix assert_that for empty inputs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam-site pull request #176: Update quickstart-java.md
GitHub user robertwb opened a pull request: https://github.com/apache/beam-site/pull/176 Update quickstart-java.md I had to make this change for a clean build to work with the 0.6.0 source release. You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam-site patch-3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam-site/pull/176.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #176 commit 38e76e8a36d2f4c4c7bd7376d55dd8678e42d978 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-13T20:26:56Z Update quickstart-java.md I had to make this change for a clean build to work with the 0.6.0 source release. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2233: Better error on missing gcp dependencies.
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2233 Better error on missing gcp dependencies. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam gcs-error Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2233.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2233 commit 00a3817463b07f7fb58f536ee354483ebb1012fd Author: Robert Bradshaw <rober...@google.com> Date: 2017-03-13T18:18:50Z Better error on missing gcp dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[7/9] beam git commit: Runner API encoding of WindowFns.
Runner API encoding of WindowFns. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aad32b7a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aad32b7a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aad32b7a Branch: refs/heads/master Commit: aad32b7a00d1aea1e7e51b68ff609d2fb3b82a8f Parents: bc76a18 Author: Robert BradshawAuthored: Tue Mar 7 12:21:02 2017 -0800 Committer: Robert Bradshaw Committed: Thu Mar 9 20:29:01 2017 -0800 -- sdks/python/apache_beam/transforms/window.py| 117 +++ .../apache_beam/transforms/window_test.py | 11 ++ sdks/python/apache_beam/utils/proto_utils.py| 37 ++ sdks/python/apache_beam/utils/urns.py | 7 ++ 4 files changed, 172 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window.py -- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 14cf2f6..a562bcf 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -49,13 +49,20 @@ WindowFn. from __future__ import absolute_import +from google.protobuf import struct_pb2 +from google.protobuf import wrappers_pb2 + from apache_beam import coders +from apache_beam.internal import pickler +from apache_beam.runners.api import beam_runner_api_pb2 from apache_beam.transforms import timeutil from apache_beam.transforms.timeutil import Duration from apache_beam.transforms.timeutil import MAX_TIMESTAMP from apache_beam.transforms.timeutil import MIN_TIMESTAMP from apache_beam.transforms.timeutil import Timestamp from apache_beam.utils.windowed_value import WindowedValue +from apache_beam.utils import proto_utils +from apache_beam.utils import urns # TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their @@ -131,6 +138,41 @@ class WindowFn(object): # By default, just return the input timestamp. return input_timestamp + _known_urns = {} + + @classmethod + def register_urn(cls, urn, parameter_type, constructor): +cls._known_urns[urn] = parameter_type, constructor + + @classmethod + def from_runner_api(cls, fn_proto, context): +parameter_type, constructor = cls._known_urns[fn_proto.spec.urn] +return constructor( +proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type), +context) + + def to_runner_api(self, context): +urn, typed_param = self.to_runner_api_parameter(context) +return beam_runner_api_pb2.FunctionSpec( +spec=beam_runner_api_pb2.UrnWithParameter( +urn=urn, +parameter=proto_utils.pack_Any(typed_param))) + + @staticmethod + def from_runner_api_parameter(fn_parameter, unused_context): +return pickler.loads(fn_parameter.value) + + def to_runner_api_parameter(self, context): +raise TypeError(self) +return (urns.PICKLED_WINDOW_FN, +wrappers_pb2.BytesValue(value=pickler.dumps(self))) + + +WindowFn.register_urn( +urns.PICKLED_WINDOW_FN, +wrappers_pb2.BytesValue, +WindowFn.from_runner_api_parameter) + class BoundedWindow(object): """A window for timestamps in range (-infinity, end). @@ -251,6 +293,16 @@ class GlobalWindows(WindowFn): def __ne__(self, other): return not self == other + @staticmethod + def from_runner_api_parameter(unused_fn_parameter, unused_context): +return GlobalWindows() + + def to_runner_api_parameter(self, context): +return urns.GLOBAL_WINDOWS_FN, None + +WindowFn.register_urn( +urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter) + class FixedWindows(WindowFn): """A windowing function that assigns each element to one time interval. @@ -280,6 +332,29 @@ class FixedWindows(WindowFn): def merge(self, merge_context): pass # No merging. + def __eq__(self, other): +if type(self) == type(other) == FixedWindows: + return self.size == other.size and self.offset == other.offset + + def __ne__(self, other): +return not self == other + + @staticmethod + def from_runner_api_parameter(fn_parameter, unused_context): +return FixedWindows( +size=Duration(micros=fn_parameter['size']), +offset=Timestamp(micros=fn_parameter['offset'])) + + def to_runner_api_parameter(self, context): +return (urns.FIXED_WINDOWS_FN, +proto_utils.pack_Struct(size=self.size.micros, +offset=self.offset.micros)) + +WindowFn.register_urn( +urns.FIXED_WINDOWS_FN, +struct_pb2.Struct, +FixedWindows.from_runner_api_parameter) + class SlidingWindows(WindowFn): """A windowing function
[8/9] beam git commit: Move pipeline context and add more tests.
# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the windowing classes.""" + +import unittest + +from apache_beam import coders +from apache_beam.runners import pipeline_context + + +class PipelineContextTest(unittest.TestCase): + + def test_deduplication(self): +context = pipeline_context.PipelineContext() +bytes_coder_ref = context.coders.get_id(coders.BytesCoder()) +bytes_coder_ref2 = context.coders.get_id(coders.BytesCoder()) +self.assertEqual(bytes_coder_ref, bytes_coder_ref2) + + def test_serialization(self): +context = pipeline_context.PipelineContext() +float_coder_ref = context.coders.get_id(coders.FloatCoder()) +bytes_coder_ref = context.coders.get_id(coders.BytesCoder()) +proto = context.to_runner_api() +context2 = pipeline_context.PipelineContext.from_runner_api(proto) +self.assertEqual( +coders.FloatCoder(), +context2.coders.get_by_id(float_coder_ref)) +self.assertEqual( +coders.BytesCoder(), +context2.coders.get_by_id(bytes_coder_ref)) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/core.py -- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 1fc63b2..3251671 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1235,6 +1235,7 @@ class Windowing(object): trigger=self.triggerfn.to_runner_api(context), accumulation_mode=self.accumulation_mode, output_time=self.output_time_fn, +# TODO(robertwb): Support EMIT_IF_NONEMPTY closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS, allowed_lateness=0) http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/trigger_test.py -- diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index cc9e0f5..827aa33 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -25,6 +25,7 @@ import unittest import yaml import apache_beam as beam +from apache_beam.runners import pipeline_context from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import trigger from apache_beam.transforms.core import Windowing @@ -392,22 +393,7 @@ class RunnerApiTest(unittest.TestCase): AfterWatermark(early=AfterCount(1000), late=AfterCount(1)), Repeatedly(AfterCount(100)), trigger.OrFinally(AfterCount(3), AfterCount(10))): - context = beam.pipeline.PipelineContext() - self.assertEqual( - trigger_fn, - TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context)) - - def test_windowing_strategy_encoding(self): -for trigger_fn in ( -DefaultTrigger(), -AfterAll(AfterCount(1), AfterCount(10)), -AfterFirst(AfterCount(10), AfterCount(100)), -AfterEach(AfterCount(100), AfterCount(1000)), -AfterWatermark(early=AfterCount(1000)), -AfterWatermark(early=AfterCount(1000), late=AfterCount(1)), -Repeatedly(AfterCount(100)), -trigger.OrFinally(AfterCount(3), AfterCount(10))): - context = beam.pipeline.PipelineContext() + context = pipeline_context.PipelineContext() self.assertEqual( trigger_fn, TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context)) http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window.py -- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index c763a96..3878dff 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -73,6 +73,7 @@ class OutputTimeFn(object): OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE + # TODO(robertwb): Add this to the runner API or remove it. OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED' @staticmethod @@ -167,7 +168,6 @@ class WindowFn(object): return pickler.loads(fn_parameter.value) def to_runner_api_parameter(self, context): -raise TypeError(self) return (urns.PICKLED_WINDOW_FN, wrappers_pb2.BytesValue(value=pickler.dumps(self))) http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window_test.py
[3/9] beam git commit: Auto-generated runner api proto bindings.
http://git-wip-us.apache.org/repos/asf/beam/blob/3bb125e1/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py -- diff --git a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py new file mode 100644 index 000..66c331b --- /dev/null +++ b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py @@ -0,0 +1,2755 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: beam_runner_api.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf.internal import enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='beam_runner_api.proto', + package='org.apache.beam.runner_api.v1', + syntax='proto3', + serialized_pb=_b('\n\x15\x62\x65\x61m_runner_api.proto\x12\x1dorg.apache.beam.runner_api.v1\x1a\x19google/protobuf/any.proto\"\x8d\x07\n\nComponents\x12M\n\ntransforms\x18\x01 \x03(\x0b\x32\x39.org.apache.beam.runner_api.v1.Components.TransformsEntry\x12Q\n\x0cpcollections\x18\x02 \x03(\x0b\x32;.org.apache.beam.runner_api.v1.Components.PcollectionsEntry\x12`\n\x14windowing_strategies\x18\x03 \x03(\x0b\x32\x42.org.apache.beam.runner_api.v1.Components.WindowingStrategiesEntry\x12\x45\n\x06\x63oders\x18\x04 \x03(\x0b\x32\x35.org.apache.beam.runner_api.v1.Components.CodersEntry\x12Q\n\x0c\x65nvironments\x18\x05 \x03(\x0b\x32;.org.apache.beam.runner_api.v1.Components.EnvironmentsEntry\x1a\\\n\x0fTransformsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32).org.apache.beam.runner_api.v1.PTransform:\x02\x38\x01\x1a_\n\x11PcollectionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x39\n\x05value\x18\x02 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.PCollection:\x02\ x38\x01\x1al\n\x18WindowingStrategiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12?\n\x05value\x18\x02 \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowingStrategy:\x02\x38\x01\x1aS\n\x0b\x43odersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.org.apache.beam.runner_api.v1.Coder:\x02\x38\x01\x1a_\n\x11\x45nvironmentsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x39\n\x05value\x18\x02 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.Environment:\x02\x38\x01\"\xe4\x06\n\x15MessageWithComponents\x12=\n\ncomponents\x18\x01 \x01(\x0b\x32).org.apache.beam.runner_api.v1.Components\x12\x35\n\x05\x63oder\x18\x02 \x01(\x0b\x32$.org.apache.beam.runner_api.v1.CoderH\x00\x12H\n\x0f\x63ombine_payload\x18\x03 \x01(\x0b\x32-.org.apache.beam.runner_api.v1.CombinePayloadH\x00\x12\x44\n\rfunction_spec\x18\x04 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpecH\x00\x12\x45\n\x0epar_do_payload\x18\x06 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.ParDoPayloadH\x00\x12?\ n\nptransform\x18\x07 \x01(\x0b\x32).org.apache.beam.runner_api.v1.PTransformH\x00\x12\x41\n\x0bpcollection\x18\x08 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.PCollectionH\x00\x12\x42\n\x0cread_payload\x18\t \x01(\x0b\x32*.org.apache.beam.runner_api.v1.ReadPayloadH\x00\x12>\n\nside_input\x18\x0b \x01(\x0b\x32(.org.apache.beam.runner_api.v1.SideInputH\x00\x12O\n\x13window_into_payload\x18\x0c \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowIntoPayloadH\x00\x12N\n\x12windowing_strategy\x18\r \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowingStrategyH\x00\x12M\n\x12urn_with_parameter\x18\x0e \x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameterH\x00\x42\x06\n\x04root\"\xa6\x01\n\x08Pipeline\x12=\n\ncomponents\x18\x01 \x01(\x0b\x32).org.apache.beam.runner_api.v1.Components\x12\x19\n\x11root_transform_id\x18\x02 \x01(\t\x12@\n\x0c\x64isplay_data\x18\x03 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.DisplayData\"\xa8\x03\n\nPTransform\x12\x13\n\x0bunique_name\x18\x05 \x01(\t\x12=\n\x04spec\x18\x01 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameter\x12\x15\n\rsubtransforms\x18\x02 \x03(\t\x12\x45\n\x06inputs\x18\x03 \x03(\x0b\x32\x35.org.apache.beam.runner_api.v1.PTransform.InputsEntry\x12G\n\x07outputs\x18\x04 \x03(\x0b\x32\x36.org.apache.beam.runner_api.v1.PTransform.OutputsEntry\x12@\n\x0c\x64isplay_data\x18\x06 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.DisplayData\x1a-\n\x0bInputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a.\n\x0cOutputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02
[1/9] beam git commit: Runner API context helper classes.
Repository: beam Updated Branches: refs/heads/master f13a84d67 -> 2c2424cb4 Runner API context helper classes. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc76a186 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc76a186 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc76a186 Branch: refs/heads/master Commit: bc76a186099568ef292ceb007388ae7174150bc2 Parents: 3bb125e Author: Robert BradshawAuthored: Tue Mar 7 12:04:27 2017 -0800 Committer: Robert Bradshaw Committed: Thu Mar 9 20:29:00 2017 -0800 -- sdks/python/apache_beam/pipeline.py | 62 1 file changed, 62 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/bc76a186/sdks/python/apache_beam/pipeline.py -- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 7db39a9..4ec2e47 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -52,11 +52,14 @@ import os import shutil import tempfile +from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints from apache_beam.internal import pickler from apache_beam.runners import create_runner from apache_beam.runners import PipelineRunner +from apache_beam.runners.api import beam_runner_api_pb2 +from apache_beam.transforms import core from apache_beam.transforms import ptransform from apache_beam.typehints import TypeCheckError from apache_beam.utils.pipeline_options import PipelineOptions @@ -440,3 +443,62 @@ class AppliedPTransform(object): if v not in visited: visited.add(v) visitor.visit_value(v, self) + + +class PipelineContextMap(object): + """This is a bi-directional map between objects and ids. + + Under the hood it encodes and decodes these objects into runner API + representations. + """ + def __init__(self, context, obj_type, proto_map=None): +self._pipeline_context = context +self._obj_type = obj_type +self._obj_to_id = {} +self._id_to_obj = {} +self._id_to_proto = proto_map if proto_map else {} +self._counter = 0 + + def _unique_ref(self): +self._counter += 1 +return "ref_%s_%s" % (self._obj_type.__name__, self._counter) + + def populate_map(self, proto_map): +for id, obj in self._id_to_obj: + proto_map[id] = self._id_to_proto[id] + + def get_id(self, obj): +if obj not in self._obj_to_id: + id = self._unique_ref() + self._id_to_obj[id] = obj + self._obj_to_id[obj] = id + self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) +return self._obj_to_id[obj] + + def get_by_id(self, id): +if id not in self._id_to_obj: + self._id_to_obj[id] = self._obj_type.from_runner_api( +self._id_to_proto[id], self._pipeline_context) +return self._id_to_obj[id] + + +class PipelineContext(object): + + _COMPONENT_TYPES = { +'transforms': AppliedPTransform, +'pcollections': pvalue.PCollection, +'coders': coders.Coder, +'windowing_strategies': core.Windowing, +# TODO: environment + } + + def __init__(self, context_proto=None): +for name, cls in self._COMPONENT_TYPES.items(): + setattr(self, name, + PipelineContextMap(self, cls, getattr(context_proto, name, None))) + + def to_runner_api(self): +context_proto = beam_runner_api_pb2.Components() +for name, cls in self._COMPONENT_TYEPS: + getattr(self, name).populate_map(getattr(context_proto, name)) +return context_proto
[6/9] beam git commit: Runner API translation of triggers and windowing strategies.
nd other._is_default: +return True + else: +return ( +self.windowfn == other.windowfn +and self.triggerfn == other.triggerfn +and self.accumulation_mode == other.accumulation_mode +and self.output_time_fn == other.output_time_fn) + def is_default(self): return self._is_default + def to_runner_api(self, context): +return beam_runner_api_pb2.WindowingStrategy( +window_fn=self.windowfn.to_runner_api(context), +# TODO(robertwb): Prohibit implicit multi-level merging. +merge_status=(beam_runner_api_pb2.NEEDS_MERGE + if self.windowfn.is_merging() + else beam_runner_api_pb2.NON_MERGING), +window_coder_id=context.coders.get_id( +self.windowfn.get_window_coder()), +trigger=self.triggerfn.to_runner_api(context), +accumulation_mode=self.accumulation_mode, +output_time=self.output_time_fn, +closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS, +allowed_lateness=0) + + @staticmethod + def from_runner_api(proto, context): +# pylint: disable=wrong-import-order, wrong-import-position +from apache_beam.transforms.trigger import TriggerFn +return Windowing( +windowfn=WindowFn.from_runner_api(proto.window_fn, context), +triggerfn=TriggerFn.from_runner_api(proto.trigger, context), +accumulation_mode=proto.accumulation_mode, +output_time_fn=proto.output_time) + @typehints.with_input_types(T) @typehints.with_output_types(T) http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/trigger.py -- diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 04198ba..b55d602 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -35,13 +35,14 @@ from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import OutputTimeFn from apache_beam.transforms.window import WindowedValue from apache_beam.transforms.window import WindowFn +from apache_beam.runners.api import beam_runner_api_pb2 class AccumulationMode(object): """Controls what to do with data when a trigger fires multiple times. """ - DISCARDING = 1 - ACCUMULATING = 2 + DISCARDING = beam_runner_api_pb2.DISCARDING + ACCUMULATING = beam_runner_api_pb2.ACCUMULATING # TODO(robertwb): Provide retractions of previous outputs. # RETRACTING = 3 @@ -185,6 +186,26 @@ class TriggerFn(object): pass # pylint: enable=unused-argument + @staticmethod + def from_runner_api(proto, context): +return { +'after_all': AfterAll, +'after_any': AfterFirst, +'after_each': AfterEach, +'after_end_of_widow': AfterWatermark, +# after_processing_time, after_synchronized_processing_time +# always +'default': DefaultTrigger, +'element_count': AfterCount, +# never +'or_finally': OrFinally, +'repeat': Repeatedly, +}[proto.WhichOneof('trigger')].from_runner_api(proto, context) + + @abstractmethod + def to_runner_api(self, unused_context): +pass + class DefaultTrigger(TriggerFn): """Semantically Repeatedly(AfterWatermark()), but more optimized.""" @@ -216,6 +237,14 @@ class DefaultTrigger(TriggerFn): def __eq__(self, other): return type(self) == type(other) + @staticmethod + def from_runner_api(proto, context): +return DefaultTrigger() + + def to_runner_api(self, unused_context): +return beam_runner_api_pb2.Trigger( +default=beam_runner_api_pb2.Trigger.Default()) + class AfterWatermark(TriggerFn): """Fire exactly once when the watermark passes the end of the window. @@ -235,9 +264,9 @@ class AfterWatermark(TriggerFn): def __repr__(self): qualifiers = [] if self.early: - qualifiers.append('early=%s' % self.early) + qualifiers.append('early=%s' % self.early.underlying) if self.late: - qualifiers.append('late=%s' % self.late) + qualifiers.append('late=%s' % self.late.underlying) return 'AfterWatermark(%s)' % ', '.join(qualifiers) def is_late(self, context): @@ -305,6 +334,28 @@ class AfterWatermark(TriggerFn): def __hash__(self): return hash((type(self), self.early, self.late)) + @staticmethod + def from_runner_api(proto, context): +return AfterWatermark( +early=TriggerFn.from_runner_api( +proto.after_end_of_widow.early_firings, context) +if proto.after_end_of_widow.HasField('early_firings') +else None, +late=TriggerFn.from_runner_api( +proto.after_end_of_widow.late_firings, context) +if proto.after_end_of_widow.HasField('late_firings') +else
[2/9] beam git commit: Auto-generated runner api proto bindings.
http://git-wip-us.apache.org/repos/asf/beam/blob/3bb125e1/sdks/python/run_pylint.sh -- diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index afc5fb4..5e63856 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -34,7 +34,8 @@ EXCLUDED_GENERATED_FILES=( "apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py" "apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py" "apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py" -"apache_beam/coders/proto2_coder_test_messages_pb2.py") +"apache_beam/coders/proto2_coder_test_messages_pb2.py" +"apache_beam/runners/api/beam_runner_api_pb2.py") FILES_TO_IGNORE="" for file in "${EXCLUDED_GENERATED_FILES[@]}"; do
[9/9] beam git commit: Closes #2190
Closes #2190 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c2424cb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c2424cb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c2424cb Branch: refs/heads/master Commit: 2c2424cb44bb2976ea9099230106a639b5ee3993 Parents: f13a84d deff128 Author: Robert BradshawAuthored: Thu Mar 9 20:29:03 2017 -0800 Committer: Robert Bradshaw Committed: Thu Mar 9 20:29:03 2017 -0800 -- sdks/python/apache_beam/coders/coders.py| 113 + sdks/python/apache_beam/runners/api/__init__.py | 16 + .../runners/api/beam_runner_api_pb2.py | 2772 ++ .../apache_beam/runners/pipeline_context.py | 88 + .../runners/pipeline_context_test.py| 49 + sdks/python/apache_beam/transforms/core.py | 39 + sdks/python/apache_beam/transforms/trigger.py | 143 +- .../apache_beam/transforms/trigger_test.py | 19 + sdks/python/apache_beam/transforms/window.py| 147 +- .../apache_beam/transforms/window_test.py | 32 + sdks/python/apache_beam/utils/proto_utils.py| 54 + sdks/python/apache_beam/utils/urns.py | 24 + sdks/python/run_pylint.sh |3 +- 13 files changed, 3481 insertions(+), 18 deletions(-) --
[5/9] beam git commit: Add license to new files.
Add license to new files. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2da21e2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2da21e2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2da21e2 Branch: refs/heads/master Commit: b2da21e287660bb3077bf89e092f7aa3c385906b Parents: 5b86e1f Author: Robert BradshawAuthored: Wed Mar 8 13:47:53 2017 -0800 Committer: Robert Bradshaw Committed: Thu Mar 9 20:29:01 2017 -0800 -- sdks/python/apache_beam/runners/api/__init__.py| 16 .../apache_beam/runners/api/beam_runner_api_pb2.py | 17 + sdks/python/apache_beam/utils/proto_utils.py | 17 + sdks/python/apache_beam/utils/urns.py | 17 + 4 files changed, 67 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/runners/api/__init__.py -- diff --git a/sdks/python/apache_beam/runners/api/__init__.py b/sdks/python/apache_beam/runners/api/__init__.py index e69de29..cce3aca 100644 --- a/sdks/python/apache_beam/runners/api/__init__.py +++ b/sdks/python/apache_beam/runners/api/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py -- diff --git a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py index 66c331b..f235ce8 100644 --- a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py +++ b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + # Generated by the protocol buffer compiler. DO NOT EDIT! # source: beam_runner_api.proto http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/utils/proto_utils.py -- diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py index 0ece8f5..b4bfdca 100644 --- a/sdks/python/apache_beam/utils/proto_utils.py +++ b/sdks/python/apache_beam/utils/proto_utils.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from google.protobuf import any_pb2 from google.protobuf import struct_pb2
[GitHub] beam pull request #2214: Ensure that assert_that takes a PCollection as its ...
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2214 Ensure that assert_that takes a PCollection as its first argument. This can avoid (silent) error such as with test_pipeline.TestPipeline() as p: bad_value = "Create0" >> beam.Create([1]) # Note missing "p | " beam_util.assert_that(bad_value, beam_util.equal_to([0])) Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam assert-that Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2214.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2214 commit 479cc0181daa3b98ef6ccdc8c3c1512bc2e3efae Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-10T00:18:46Z Ensure that assert_that takes a PCollection as its first argument. This can avoid (silent) error such as with test_pipeline.TestPipeline() as p: bad_value = "Create0" >> beam.Create([1]) # Note missing "p | " beam_util.assert_that(bad_value, beam_util.equal_to([0])) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2191: Fix typo in proto: widow -> window.
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2191 Fix typo in proto: widow -> window. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam widow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2191 commit 558731978ae8e62c59130314de9db57d253ba10f Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-08T01:36:41Z Fix typo in proto: widow -> window. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2190: [BEAM-115] Runner API representation of windowing s...
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2190 [BEAM-115] Runner API representation of windowing strategies for Python Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam py-runner-api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2190 commit 356266b26684ad7e6846eaba33f6744f365890cf Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-07T20:02:08Z Auto-generated runner api proto bindings. commit e18be9cb1dd665a10b7250209c28d10600614bdb Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-07T20:04:27Z Runner API context helper classes. commit 0624235719bb9f813e620939fc0e11ac713708cb Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-07T20:21:02Z Runner API encoding of WindowFns. commit 243ba920ee1682f8c7863c339b7d057c9fecb14c Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-03-08T00:18:02Z Runner API translation of triggers and windowing strategies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] beam git commit: Updates Python SDK source API so that sources can report limited parallelism signals.
Repository: beam Updated Branches: refs/heads/master de9e8528c -> 8f5f19d11 Updates Python SDK source API so that sources can report limited parallelism signals. With this update Python BoundedSource/RangeTracker API can report consumed and remaining number of split points while performing a source read operations, similar to Java SDK sources. These signals can be used by runner implementations, for example, to perform scaling decisions. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97a7ae44 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97a7ae44 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97a7ae44 Branch: refs/heads/master Commit: 97a7ae449e1ccf6b08a0ee0bc2fc0a1b49924f1f Parents: de9e852 Author: Chamikara JayalathAuthored: Wed Jan 4 19:10:09 2017 -0800 Committer: Robert Bradshaw Committed: Thu Mar 2 17:19:08 2017 -0800 -- sdks/python/apache_beam/io/avroio.py| 33 +++- sdks/python/apache_beam/io/avroio_test.py | 31 sdks/python/apache_beam/io/iobase.py| 157 ++- sdks/python/apache_beam/io/range_trackers.py| 41 - .../apache_beam/io/range_trackers_test.py | 52 ++ sdks/python/apache_beam/io/textio.py| 17 +- sdks/python/apache_beam/io/textio_test.py | 13 ++ .../runners/dataflow/native_io/iobase.py| 13 +- 8 files changed, 342 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/97a7ae44/sdks/python/apache_beam/io/avroio.py -- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 5dab651..ab98530 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -28,6 +28,7 @@ from avro import schema import apache_beam as beam from apache_beam.io import filebasedsource from apache_beam.io import fileio +from apache_beam.io import iobase from apache_beam.io.iobase import Read from apache_beam.transforms import PTransform @@ -135,6 +136,7 @@ class _AvroUtils(object): ValueError: If the block cannot be read properly because the file doesn't match the specification. """ +offset = f.tell() decoder = avroio.BinaryDecoder(f) num_records = decoder.read_long() block_size = decoder.read_long() @@ -144,7 +146,8 @@ class _AvroUtils(object): raise ValueError('Unexpected sync marker (actual "%s" vs expected "%s"). ' 'Maybe the underlying avro file is corrupted?', sync_marker, expected_sync_marker) -return _AvroBlock(block_bytes, num_records, codec, schema) +size = f.tell() - offset +return _AvroBlock(block_bytes, num_records, codec, schema, offset, size) @staticmethod def advance_file_past_next_sync_marker(f, sync_marker): @@ -172,13 +175,22 @@ class _AvroUtils(object): class _AvroBlock(object): """Represents a block of an Avro file.""" - def __init__(self, block_bytes, num_records, codec, schema_string): + def __init__(self, block_bytes, num_records, codec, schema_string, + offset, size): # Decompress data early on (if needed) and thus decrease the number of # parallel copies of the data in memory at any given in time during # block iteration. self._decompressed_block_bytes = self._decompress_bytes(block_bytes, codec) self._num_records = num_records self._schema = schema.parse(schema_string) +self._offset = offset +self._size = size + + def size(self): +return self._size + + def offset(self): +return self._offset @staticmethod def _decompress_bytes(data, codec): @@ -232,12 +244,26 @@ class _AvroSource(filebasedsource.FileBasedSource): """ def read_records(self, file_name, range_tracker): +next_block_start = -1 + +def split_points_unclaimed(stop_position): + if next_block_start >= stop_position: +# Next block starts at or after the suggested stop position. Hence +# there will not be split points to be claimed for the range ending at +# suggested stop position. +return 0 + + return iobase.RangeTracker.SPLIT_POINTS_UNKNOWN + +range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed) + start_offset = range_tracker.start_position() if start_offset is None: start_offset = 0 with self.open_file(file_name) as f: - codec, schema_string, sync_marker = _AvroUtils.read_meta_data_from_file(f) + codec, schema_string, sync_marker = _AvroUtils.read_meta_data_from_file( + f) # We have to start at current position if previous bundle ended at the # end of a
[1/2] beam git commit: Inline rather than reference FunctionSpecs.
Repository: beam Updated Branches: refs/heads/master 79b1395c2 -> d84b06791 Inline rather than reference FunctionSpecs. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d390406e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d390406e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d390406e Branch: refs/heads/master Commit: d390406e27112faed31233d7daef1f650a31cd0f Parents: 79b1395 Author: Robert BradshawAuthored: Tue Feb 28 15:51:24 2017 -0800 Committer: Robert Bradshaw Committed: Wed Mar 1 09:04:30 2017 -0800 -- .../src/main/proto/beam_runner_api.proto| 39 +--- .../beam/sdk/util/WindowingStrategies.java | 18 ++--- 2 files changed, 20 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/d390406e/sdks/common/runner-api/src/main/proto/beam_runner_api.proto -- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 58532b2..44ead56 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -47,10 +47,6 @@ message Components { // (Required) A map from pipeline-scoped id to Environment. map environments = 5; - - // (Required) A map from pipeline-scoped id to FunctionSpec, - // a record for a particular user-defined function. - map function_specs = 6; } // A disjoint union of all the things that may contain references @@ -207,8 +203,8 @@ message PCollection { // The payload for the primitive ParDo transform. message ParDoPayload { - // (Required) The pipeline-scoped id of the FunctionSpec for the DoFn. - string fn_id = 1; + // (Required) The FunctionSpec of the DoFn. + FunctionSpec do_fn = 1; // (Required) Additional pieces of context the DoFn may require that // are not otherwise represented in the payload. @@ -266,9 +262,8 @@ enum IsBounded { // The payload for the primitive Read transform. message ReadPayload { - // (Required) The pipeline-scoped id of the FunctionSpec of the source for - // this Read. - string source_id = 1; + // (Required) The FunctionSpec of the source for this Read. + FunctionSpec source = 1; // (Required) Whether the source is bounded or unbounded IsBounded is_bounded = 2; @@ -279,15 +274,15 @@ message ReadPayload { // The payload for the WindowInto transform. message WindowIntoPayload { - // (Required) The pipeline-scoped id for the FunctionSpec of the WindowFn. - string fn_id = 1; + // (Required) The FunctionSpec of the WindowFn. + FunctionSpec window_fn = 1; } // The payload for the special-but-not-primitive Combine transform. message CombinePayload { - // (Required) The pipeline-scoped id of the FunctionSpec for the CombineFn. - string fn_id = 1; + // (Required) The FunctionSpec of the CombineFn. + FunctionSpec combine_fn = 1; // (Required) A reference to the Coder to use for accumulators of the CombineFn string accumulator_coder_id = 2; @@ -325,10 +320,10 @@ message Coder { // TODO: consider inlining field on PCollection message WindowingStrategy { - // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that - // assigns windows, merges windows, and shifts timestamps before they are + // (Required) The FunctionSpec of the UDF that assigns windows, + // merges windows, and shifts timestamps before they are // combined according to the OutputTime. - string fn_id = 1; + FunctionSpec window_fn = 1; // (Required) Whether or not the window fn is merging. // @@ -584,20 +579,20 @@ message SideInput { // URN) UrnWithParameter access_pattern = 1; - // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that - // adapts a particular access_pattern to a user-facing view type. + // (Required) The FunctionSpec of the UDF that adapts a particular + // access_pattern to a user-facing view type. // // For example, View.asSingleton() may include a `view_fn` that adapts a // specially-designed multimap to a single value per window. - string view_fn_id = 2; + FunctionSpec view_fn = 2; - // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that - // maps a main input window to a side input window. + // (Required) The FunctionSpec of the UDF that maps a main input window + // to a side input window. // // For example, when the main input is in fixed windows of one hour, this // can specify that the side input should be accessed according to the day // in which that hour falls. - string window_mapping_fn_id = 3; +
[2/2] beam git commit: Closes #2131
Closes #2131 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d84b0679 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d84b0679 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d84b0679 Branch: refs/heads/master Commit: d84b0679178ab803081ee3e05fe222d234e94c30 Parents: 79b1395 d390406 Author: Robert BradshawAuthored: Wed Mar 1 09:04:31 2017 -0800 Committer: Robert Bradshaw Committed: Wed Mar 1 09:04:31 2017 -0800 -- .../src/main/proto/beam_runner_api.proto| 39 +--- .../beam/sdk/util/WindowingStrategies.java | 18 ++--- 2 files changed, 20 insertions(+), 37 deletions(-) --
[1/3] beam git commit: Include cython tests in presubmits for linux platform
Repository: beam Updated Branches: refs/heads/master 3f4e008bf -> 352af855e Include cython tests in presubmits for linux platform Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2a03dd4e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2a03dd4e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2a03dd4e Branch: refs/heads/master Commit: 2a03dd4e1c0ca0d72f44101c15fa3b824e245628 Parents: b498e11 Author: Vikas KedigehalliAuthored: Tue Feb 28 10:09:40 2017 -0800 Committer: Robert Bradshaw Committed: Tue Feb 28 16:53:05 2017 -0800 -- sdks/python/tox.ini | 22 +- 1 file changed, 21 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/2a03dd4e/sdks/python/tox.ini -- diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 521f106..927c211 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -16,7 +16,7 @@ ; [tox] -envlist = py27,py27gcp,lint,docs +envlist = py27,py27gcp,py27cython,lint,docs [pep8] # Disable all errors and warnings except for the ones related to blank lines. @@ -34,6 +34,26 @@ commands = python setup.py test passenv = TRAVIS* +[testenv:py27cython] +# cython tests are only expected to work in linux (2.x and 3.x) +# If we want to add other platforms in the future, it should be: +# `platform = linux2|darwin|...` +# See https://docs.python.org/2/library/sys.html#sys.platform for platform codes +platform = linux2 +# autocomplete_test depends on nose when invoked directly. +deps = + nose + cython +commands = + python --version + pip install -e .[test] + python apache_beam/examples/complete/autocomplete_test.py + python setup.py test + # Clean up all cython generated files. + find apache_beam -type f -name '*.c' -delete + find apache_beam -type f -name '*.so' -delete +passenv = TRAVIS* + [testenv:py27gcp] # autocomplete_test depends on nose when invoked directly. deps =
[3/3] beam git commit: Closes #2128
Closes #2128 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/352af855 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/352af855 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/352af855 Branch: refs/heads/master Commit: 352af855edf9e07978d1d6d2a61fde372f927165 Parents: 3f4e008 2a03dd4 Author: Robert BradshawAuthored: Tue Feb 28 16:53:06 2017 -0800 Committer: Robert Bradshaw Committed: Tue Feb 28 16:53:06 2017 -0800 -- sdks/python/apache_beam/coders/stream.pxd | 2 +- sdks/python/tox.ini | 22 +- 2 files changed, 22 insertions(+), 2 deletions(-) --
[2/3] beam git commit: Update output stream cython declaration
Update output stream cython declaration Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b498e115 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b498e115 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b498e115 Branch: refs/heads/master Commit: b498e115ac6ee043c45d2868ccf10e5132669c29 Parents: 3f4e008 Author: Vikas KedigehalliAuthored: Tue Feb 28 07:59:36 2017 -0800 Committer: Robert Bradshaw Committed: Tue Feb 28 16:53:05 2017 -0800 -- sdks/python/apache_beam/coders/stream.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/b498e115/sdks/python/apache_beam/coders/stream.pxd -- diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd index a1eb9f7..4e01a89 100644 --- a/sdks/python/apache_beam/coders/stream.pxd +++ b/sdks/python/apache_beam/coders/stream.pxd @@ -20,7 +20,7 @@ cimport libc.stdint cdef class OutputStream(object): cdef char* data - cdef size_t _size + cdef size_t buffer_size cdef size_t pos cpdef write(self, bytes b, bint nested=*)
[GitHub] beam pull request #2131: [BEAM-115] Inline rather than reference FunctionSpe...
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2131 [BEAM-115] Inline rather than reference FunctionSpecs. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam runner-protos Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2131.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2131 commit cbfce74aae64f9b353c07ced8427dc77ab1c31f4 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-02-28T23:51:24Z Inline rather than reference FunctionSpecs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: Make side inputs a map, rather than embedding the name in the message.
Make side inputs a map, rather than embedding the name in the message. The "local" name only make sense in context. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d59b10f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d59b10f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d59b10f Branch: refs/heads/master Commit: 2d59b10f90bfaa1c455dee1545504d9da74a9163 Parents: 61e31e6 Author: Robert BradshawAuthored: Mon Feb 27 17:59:57 2017 -0800 Committer: Robert Bradshaw Committed: Tue Feb 28 14:39:34 2017 -0800 -- .../src/main/proto/beam_runner_api.proto| 25 +++- 1 file changed, 9 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/2d59b10f/sdks/common/runner-api/src/main/proto/beam_runner_api.proto -- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 258c278..58532b2 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -215,10 +215,9 @@ message ParDoPayload { // (may force runners to execute the ParDo differently) repeated Parameter parameters = 2; - // (Optional) An ordered list of side inputs, describing for each local name - // to the data to be provided and the expected access pattern. - // (the SDK may not be order-sensitive) - repeated SideInput side_inputs = 3; + // (Optional) A mapping of local input names to side inputs, describing + // the expected access pattern. + map side_inputs = 3; // (Optional) if the DoFn uses state, a list of the specs for cells. repeated StateSpec state_specs = 4; @@ -298,10 +297,9 @@ message CombinePayload { // (may force runners to execute the ParDo differently) repeated Parameter parameters = 3; - // (Optional) An ordered list of side inputs, describing for each local name - // to the data to be provided and the expected access pattern. - // (the SDK may not be order-sensitive) - repeated SideInput side_inputs = 4; + // (Optional) A mapping of local input names to side inputs, describing + // the expected access pattern. + map side_inputs = 4; } // A coder, the binary format for serialization and deserialization of data in @@ -575,11 +573,6 @@ message TimestampTransform { // A specification for how to "side input" a PCollection. message SideInput { - - // (Required) A local name for this side input, as interpreted by its - // parent PTransform (and/or its PTransform's UDFs). - string name = 1; - // (Required) URN of the access pattern required by the `view_fn` to present // the desired SDK-specific interface to a UDF. // @@ -589,14 +582,14 @@ message SideInput { // The only access pattern intended for Beam, because of its superior // performance possibilities, is "urn:beam:sideinput:multimap" (or some such // URN) - UrnWithParameter access_pattern = 3; + UrnWithParameter access_pattern = 1; // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that // adapts a particular access_pattern to a user-facing view type. // // For example, View.asSingleton() may include a `view_fn` that adapts a // specially-designed multimap to a single value per window. - string view_fn_id = 4; + string view_fn_id = 2; // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that // maps a main input window to a side input window. @@ -604,7 +597,7 @@ message SideInput { // For example, when the main input is in fixed windows of one hour, this // can specify that the side input should be accessed according to the day // in which that hour falls. - string window_mapping_fn_id = 5; + string window_mapping_fn_id = 3; } // An environment for executing UDFs. Generally an SDK container URL, but
[1/2] beam git commit: Closes #2124
Repository: beam Updated Branches: refs/heads/master 61e31e622 -> 3f4e008bf Closes #2124 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3f4e008b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3f4e008b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3f4e008b Branch: refs/heads/master Commit: 3f4e008bf5231771094878d92256b674730630dd Parents: 61e31e6 2d59b10 Author: Robert BradshawAuthored: Tue Feb 28 14:39:34 2017 -0800 Committer: Robert Bradshaw Committed: Tue Feb 28 14:39:34 2017 -0800 -- .../src/main/proto/beam_runner_api.proto| 25 +++- 1 file changed, 9 insertions(+), 16 deletions(-) --
[GitHub] beam pull request #2124: Make side inputs a map, rather than embedding the n...
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2124 Make side inputs a map, rather than embedding the name in the message. The "local" name only make sense in context. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam runner-protos Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2124.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2124 commit b2c0f93d0f4d63836a79956d75b6dd100e9138ba Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-02-28T01:59:57Z Make side inputs a map, rather than embedding the name in the message. The "local" name only make sense in context. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/3] beam git commit: Make access_pattern a URN with params.
Repository: beam Updated Branches: refs/heads/master c8dfb851c -> 16736a6b6 Make access_pattern a URN with params. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a0dcd3f9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a0dcd3f9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a0dcd3f9 Branch: refs/heads/master Commit: a0dcd3f97f48cdca02bae4c371e4e143d00fdba1 Parents: c8dfb85 Author: Robert BradshawAuthored: Fri Feb 24 15:53:39 2017 -0800 Committer: Robert Bradshaw Committed: Mon Feb 27 17:46:00 2017 -0800 -- sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/a0dcd3f9/sdks/common/runner-api/src/main/proto/beam_runner_api.proto -- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 989e4bb..ad6d0cb 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -591,7 +591,7 @@ message SideInput { // The only access pattern intended for Beam, because of its superior // performance possibilities, is "urn:beam:sideinput:multimap" (or some such // URN) - string access_pattern = 3; + UrnWithParameter access_pattern = 3; // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that // adapts a particular access_pattern to a user-facing view type.
[3/3] beam git commit: Closes #2106
Closes #2106 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16736a6b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16736a6b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16736a6b Branch: refs/heads/master Commit: 16736a6b612ba38b19694ae2a23dea7f5fab1563 Parents: c8dfb85 c04e8ab Author: Robert BradshawAuthored: Mon Feb 27 17:46:02 2017 -0800 Committer: Robert Bradshaw Committed: Mon Feb 27 17:46:02 2017 -0800 -- sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 8 +++- 1 file changed, 3 insertions(+), 5 deletions(-) --
[2/3] beam git commit: Clarify side input name and remove redundant pcollection reference.
Clarify side input name and remove redundant pcollection reference. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c04e8abd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c04e8abd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c04e8abd Branch: refs/heads/master Commit: c04e8abd41575a4e819c3f18dabff921338f55d8 Parents: a0dcd3f Author: Robert BradshawAuthored: Fri Feb 24 16:31:05 2017 -0800 Committer: Robert Bradshaw Committed: Mon Feb 27 17:46:01 2017 -0800 -- sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/c04e8abd/sdks/common/runner-api/src/main/proto/beam_runner_api.proto -- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index ad6d0cb..258c278 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -576,12 +576,10 @@ message TimestampTransform { // A specification for how to "side input" a PCollection. message SideInput { - // (Required) A local name for this side input, as embedded in a serialized UDF. + // (Required) A local name for this side input, as interpreted by its + // parent PTransform (and/or its PTransform's UDFs). string name = 1; - // (Required) The pipeline-scoped unique id of the PCollection to be side input. - string pcollection_id = 2; - // (Required) URN of the access pattern required by the `view_fn` to present // the desired SDK-specific interface to a UDF. //
[GitHub] beam pull request #2106: [BEAM-115] More Runner API refinements.
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/2106 [BEAM-115] More Runner API refinements. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam runner-protos Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2106.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2106 commit c2f5351e3731f29d024240016c553cecd3be3143 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-02-24T23:53:39Z Make access_pattern a URN with params. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: Attach original traceback in direct runner failure.
Attach original traceback in direct runner failure. This will make it much easier to debug pipelines. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5fba260d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5fba260d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5fba260d Branch: refs/heads/master Commit: 5fba260d89040b4325c872c2764a3981ad189df8 Parents: effca63 Author: Robert BradshawAuthored: Thu Feb 9 17:28:32 2017 -0800 Committer: Robert Bradshaw Committed: Fri Feb 10 13:22:02 2017 -0800 -- .../apache_beam/runners/direct/executor.py | 24 ++-- 1 file changed, 17 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/5fba260d/sdks/python/apache_beam/runners/direct/executor.py -- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 2d4a8bd..ff0d372 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -22,6 +22,7 @@ from __future__ import absolute_import import collections import logging import Queue +import sys import threading import traceback from weakref import WeakValueDictionary @@ -364,7 +365,8 @@ class _ExecutorServiceParallelExecutor(object): update = self.visible_updates.take() try: if update.exception: -raise update.exception +t, v, tb = update.exc_info +raise t, v, tb finally: self.executor_service.shutdown() @@ -426,6 +428,10 @@ class _ExecutorServiceParallelExecutor(object): assert bool(produced_bundle) != bool(exception) self.committed_bundle = produced_bundle self.exception = exception + self.exc_info = sys.exc_info() + if self.exc_info[1] is not exception: +# Not the right exception. +self.exc_info = (exception, None, None) class VisibleExecutorUpdate(object): """An update of interest to the user. @@ -434,9 +440,10 @@ class _ExecutorServiceParallelExecutor(object): raise an exception. """ -def __init__(self, exception=None): - self.finished = exception is not None - self.exception = exception +def __init__(self, exc_info=(None, None, None)): + self.finished = exc_info[0] is not None + self.exception = exc_info[1] or exc_info[0] + self.exc_info = exc_info class _MonitorTask(ExecutorService.CallableTask): """MonitorTask continuously runs to ensure that pipeline makes progress.""" @@ -460,7 +467,7 @@ class _ExecutorServiceParallelExecutor(object): update.exception) self._executor.visible_updates.offer( _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( -update.exception)) +update.exc_info)) update = self._executor.all_updates.poll() self._executor.evaluation_context.schedule_pending_unblocked_tasks( self._executor.executor_service) @@ -468,7 +475,8 @@ class _ExecutorServiceParallelExecutor(object): except Exception as e: # pylint: disable=broad-except logging.error('Monitor task died due to exception.\n %s', e) self._executor.visible_updates.offer( -_ExecutorServiceParallelExecutor.VisibleExecutorUpdate(e)) +_ExecutorServiceParallelExecutor.VisibleExecutorUpdate( +sys.exc_info())) finally: if not self._should_shutdown(): self._executor.executor_service.submit(self) @@ -502,7 +510,9 @@ class _ExecutorServiceParallelExecutor(object): # Nothing is scheduled for execution, but watermarks incomplete. self._executor.visible_updates.offer( _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( - Exception('Monitor task detected a pipeline stall.'))) + (Exception('Monitor task detected a pipeline stall.'), + None, + None))) self._executor.executor_service.shutdown() return True
[1/2] beam git commit: Closes #1970
Repository: beam Updated Branches: refs/heads/master effca63f9 -> 51ef0009a Closes #1970 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51ef0009 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51ef0009 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51ef0009 Branch: refs/heads/master Commit: 51ef0009afd03b5a650836a8ae2cffab3c832476 Parents: effca63 5fba260 Author: Robert BradshawAuthored: Fri Feb 10 13:22:02 2017 -0800 Committer: Robert Bradshaw Committed: Fri Feb 10 13:22:02 2017 -0800 -- .../apache_beam/runners/direct/executor.py | 24 ++-- 1 file changed, 17 insertions(+), 7 deletions(-) --
[2/2] beam git commit: Closes #1959
Closes #1959 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caaa64dc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caaa64dc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caaa64dc Branch: refs/heads/master Commit: caaa64dc6c3f92be7a53349e9d58a5630db855a5 Parents: 9c4a784 7126467 Author: Robert BradshawAuthored: Fri Feb 10 09:47:48 2017 -0800 Committer: Robert Bradshaw Committed: Fri Feb 10 09:47:48 2017 -0800 -- sdks/python/apache_beam/runners/common.py | 13 + sdks/python/apache_beam/transforms/window_test.py | 16 2 files changed, 25 insertions(+), 4 deletions(-) --
[1/2] beam git commit: BEAM-1443: Preserve window duplication on window assignment.
Repository: beam Updated Branches: refs/heads/master 9c4a784bb -> caaa64dc6 BEAM-1443: Preserve window duplication on window assignment. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71264671 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71264671 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71264671 Branch: refs/heads/master Commit: 712646714b8efec6b024ebd2ba0f0e066280f36e Parents: 9c4a784 Author: Robert BradshawAuthored: Wed Feb 8 17:58:32 2017 -0800 Committer: Robert Bradshaw Committed: Fri Feb 10 09:47:47 2017 -0800 -- sdks/python/apache_beam/runners/common.py | 13 + sdks/python/apache_beam/transforms/window_test.py | 16 2 files changed, 25 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/71264671/sdks/python/apache_beam/runners/common.py -- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 6f86ca0..2c1032d 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -224,7 +224,7 @@ class DoFnRunner(Receiver): # Call for the process function for each window if has windowed side inputs # or if the process accesses the window parameter. We can just call it once # otherwise as none of the arguments are changing -if self.has_windowed_inputs and len(element.windows) > 1: +if self.has_windowed_inputs and len(element.windows) != 1: for w in element.windows: self._dofn_per_window_invoker( WindowedValue(element.value, element.timestamp, (w,))) @@ -280,7 +280,7 @@ class DoFnRunner(Receiver): else: raise - def _process_outputs(self, element, results): + def _process_outputs(self, windowed_input_element, results): """Dispatch the result of computation to the appropriate receivers. A value wrapped in a SideOutputValue object will be unwrapped and @@ -297,7 +297,10 @@ class DoFnRunner(Receiver): result = result.value if isinstance(result, WindowedValue): windowed_value = result - elif element is None: +if (windowed_input_element is not None +and len(windowed_input_element.windows) != 1): + windowed_value.windows *= len(windowed_input_element.windows) + elif windowed_input_element is None: # Start and finish have no element from which to grab context, # but may emit elements. if isinstance(result, TimestampedValue): @@ -315,8 +318,10 @@ class DoFnRunner(Receiver): windowed_value = WindowedValue( result.value, result.timestamp, self.window_fn.assign(assign_context)) +if len(windowed_input_element.windows) != 1: + windowed_value.windows *= len(windowed_input_element.windows) else: -windowed_value = element.with_value(result) +windowed_value = windowed_input_element.with_value(result) if tag is None: self.main_receivers.receive(windowed_value) else: http://git-wip-us.apache.org/repos/asf/beam/blob/71264671/sdks/python/apache_beam/transforms/window_test.py -- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 1a21709..c4072ac 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -176,6 +176,22 @@ class WindowTest(unittest.TestCase): ('key', [5, 6, 7, 8, 9])])) p.run() + def test_rewindow(self): +p = TestPipeline() +result = (p + | Create([(k, k) for k in range(10)]) + | Map(lambda (x, t): TimestampedValue(x, t)) + | 'window' >> WindowInto(SlidingWindows(period=2, size=6)) + # Per the model, each element is now duplicated across + # three windows. Rewindowing must preserve this duplication. + | 'rewindow' >> WindowInto(FixedWindows(5)) + | 'rewindow2' >> WindowInto(FixedWindows(5)) + | Map(lambda v: ('key', v)) + | GroupByKey()) +assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)), + ('key', sorted([5, 6, 7, 8, 9] * 3))])) +p.run() + def test_timestamped_with_combiners(self): p = TestPipeline() result = (p
[2/2] beam git commit: Closes #1969
Closes #1969 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ab60d89 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ab60d89 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ab60d89 Branch: refs/heads/master Commit: 6ab60d890d41518e0d6c6214bd03b5191157c7af Parents: 70a702e 1677e10 Author: Robert BradshawAuthored: Thu Feb 9 17:34:21 2017 -0800 Committer: Robert Bradshaw Committed: Thu Feb 9 17:34:21 2017 -0800 -- sdks/python/apache_beam/io/textio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[GitHub] beam pull request #1970: Attach original exception in direct runner failure.
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/1970 Attach original exception in direct runner failure. This will make it much easier to debug pipelines. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam runner-exceptions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1970.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1970 commit 40fb5f01fe7327ff9b4de3d98166363b234873a0 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-02-10T01:28:32Z Attach original exception in direct runner failure. This will make it much easier to debug pipelines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] beam git commit: [BEAM-1450] Fix NewDoFn handling of window explosion.
Repository: beam Updated Branches: refs/heads/master c26d5827b -> cd6802bec [BEAM-1450] Fix NewDoFn handling of window explosion. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37b6fb1a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37b6fb1a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37b6fb1a Branch: refs/heads/master Commit: 37b6fb1a1c65ff53cef242321eb1ef4ddf48e022 Parents: c26d582 Author: Robert BradshawAuthored: Thu Feb 9 12:18:48 2017 -0800 Committer: Robert Bradshaw Committed: Thu Feb 9 15:06:28 2017 -0800 -- sdks/python/apache_beam/pipeline_test.py | 7 +++ sdks/python/apache_beam/runners/common.pxd | 3 +-- sdks/python/apache_beam/runners/common.py | 17 +++-- 3 files changed, 19 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/pipeline_test.py -- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 90b1a54..2f188aa 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -327,6 +327,13 @@ class DoFnTest(unittest.TestCase): | ParDo(TestDoFn())) assert_that(pcoll, equal_to([(1, (-5, 5)), (1, (0, 10)), (7, (0, 10)), (7, (5, 15))])) +pcoll2 = pcoll | 'Again' >> ParDo(TestDoFn()) +assert_that( +pcoll2, +equal_to([ +((1, (-5, 5)), (-5, 5)), ((1, (0, 10)), (0, 10)), +((7, (0, 10)), (0, 10)), ((7, (5, 15)), (5, 15))]), +label='doubled windows') pipeline.run() def test_timestamp_param(self): http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/runners/common.pxd -- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index f36fdd0..781d96b 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -50,8 +50,7 @@ cdef class DoFnRunner(Receiver): cpdef process(self, WindowedValue element) cdef _dofn_invoker(self, WindowedValue element) cdef _dofn_simple_invoker(self, WindowedValue element) - cdef _dofn_window_invoker( - self, WindowedValue element, list args, dict kwargs, object window) + cdef _dofn_per_window_invoker(self, WindowedValue element) @cython.locals(windowed_value=WindowedValue) cpdef _process_outputs(self, WindowedValue element, results) http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/runners/common.py -- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 50ccf22..6f86ca0 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -197,7 +197,13 @@ class DoFnRunner(Receiver): def _dofn_simple_invoker(self, element): self._process_outputs(element, self.dofn_process(element.value)) - def _dofn_window_invoker(self, element, args, kwargs, window): + def _dofn_per_window_invoker(self, element): +if self.has_windowed_inputs: + window, = element.windows + args, kwargs = util.insert_values_in_args( + self.args, self.kwargs, [si[window] for si in self.side_inputs]) +else: + args, kwargs = self.args, self.kwargs # TODO(sourabhbajaj): Investigate why we can't use `is` instead of == for i, p in self.placeholders: if p == core.DoFn.ElementParam: @@ -218,13 +224,12 @@ class DoFnRunner(Receiver): # Call for the process function for each window if has windowed side inputs # or if the process accesses the window parameter. We can just call it once # otherwise as none of the arguments are changing -if self.has_windowed_inputs: +if self.has_windowed_inputs and len(element.windows) > 1: for w in element.windows: -args, kwargs = util.insert_values_in_args( -self.args, self.kwargs, [si[w] for si in self.side_inputs]) -self._dofn_window_invoker(element, args, kwargs, w) +self._dofn_per_window_invoker( +WindowedValue(element.value, element.timestamp, (w,))) else: - self._dofn_window_invoker(element, self.args, self.kwargs, None) + self._dofn_per_window_invoker(element) def _invoke_bundle_method(self, method): try:
[GitHub] beam pull request #1965: [BEAM-1450] Fix NewDoFn handling of window explosio...
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/1965 [BEAM-1450] Fix NewDoFn handling of window explosion. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam newdofn-windows Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1965.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1965 commit a7b5fe559e3796c2b31aa55ddb8ff2b4566f6a00 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-02-09T20:18:48Z [BEAM-1450] Fix NewDoFn handling of window explosion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: Closes #1845
Closes #1845 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f90558c3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f90558c3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f90558c3 Branch: refs/heads/master Commit: f90558c34ec90c1f43b119b160971deddc8b2c61 Parents: a26fd1f 98e513b Author: Robert BradshawAuthored: Mon Feb 6 20:28:04 2017 -0800 Committer: Robert Bradshaw Committed: Mon Feb 6 20:28:04 2017 -0800 -- .../apache_beam/examples/snippets/snippets.py | 20 -- .../examples/snippets/snippets_test.py | 29 2 files changed, 40 insertions(+), 9 deletions(-) --
[1/2] beam git commit: Add snippet for reading from compressed text sources
Repository: beam Updated Branches: refs/heads/master a26fd1ff3 -> f90558c34 Add snippet for reading from compressed text sources Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98e513bc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98e513bc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98e513bc Branch: refs/heads/master Commit: 98e513bc64be3241d8a97902cba4808eec384a4b Parents: a26fd1f Author: Sourabh BajajAuthored: Mon Feb 6 17:23:14 2017 -0800 Committer: Robert Bradshaw Committed: Mon Feb 6 20:28:03 2017 -0800 -- .../apache_beam/examples/snippets/snippets.py | 20 -- .../examples/snippets/snippets_test.py | 29 2 files changed, 40 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/98e513bc/sdks/python/apache_beam/examples/snippets/snippets.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 42d194e..c3879dc 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -828,8 +828,7 @@ def model_textio(renames): # [START model_textio_read] p = beam.Pipeline(options=PipelineOptions()) # [START model_pipelineio_read] - lines = p | 'ReadFromText' >> beam.io.ReadFromText( - 'gs://my_bucket/path/to/input-*.csv') + lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv') # [END model_pipelineio_read] # [END model_textio_read] @@ -837,7 +836,7 @@ def model_textio(renames): filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words) # [START model_pipelineio_write] filtered_words | 'WriteToText' >> beam.io.WriteToText( - 'gs://my_bucket/path/to/numbers', file_name_suffix='.csv') + '/path/to/numbers', file_name_suffix='.csv') # [END model_pipelineio_write] # [END model_textio_write] @@ -845,6 +844,21 @@ def model_textio(renames): p.run().wait_until_finish() +def model_textio_compressed(renames, expected): + """Using a Read Transform to read compressed text files.""" + p = TestPipeline() + + # [START model_textio_write_compressed] + lines = p | 'ReadFromText' >> beam.io.ReadFromText( + '/path/to/input-*.csv.gz', + compression_type=beam.io.fileio.CompressionTypes.GZIP) + # [END model_textio_write_compressed] + + beam.assert_that(lines, beam.equal_to(expected)) + p.visit(SnippetUtils.RenameFiles(renames)) + p.run().wait_until_finish() + + def model_datastoreio(): """Using a Read and Write transform to read/write to Cloud Datastore.""" http://git-wip-us.apache.org/repos/asf/beam/blob/98e513bc/sdks/python/apache_beam/examples/snippets/snippets_test.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index a602b66..4827e94 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -18,6 +18,7 @@ """Tests for all code snippets used in public docs.""" import glob +import gzip import logging import os import tempfile @@ -355,13 +356,15 @@ class SnippetsTest(unittest.TestCase): To be used for testing. """ -def __init__(self, file_to_read=None): +def __init__(self, file_to_read=None, compression_type=None): self.file_to_read = file_to_read + self.compression_type = compression_type class ReadDoFn(beam.DoFn): - def __init__(self, file_to_read): + def __init__(self, file_to_read, compression_type): self.file_to_read = file_to_read +self.compression_type = compression_type self.coder = coders.StrUtf8Coder() def process(self, element): @@ -370,13 +373,19 @@ class SnippetsTest(unittest.TestCase): def finish_bundle(self): assert self.file_to_read for file_name in glob.glob(self.file_to_read): - with open(file_name) as file: -for record in file: - yield self.coder.decode(record.rstrip('\n')) + if self.compression_type is None: +with open(file_name) as file: + for record in file: +yield self.coder.decode(record.rstrip('\n')) + else: +with gzip.open(file_name, 'r') as file: + for record in file: +yield self.coder.decode(record.rstrip('\n')) def expand(self, pcoll): return pcoll | beam.Create([None]) | 'DummyReadForTesting' >> beam.ParDo( -
[1/2] beam git commit: Reduce cost of sample combine test
Repository: beam Updated Branches: refs/heads/master 2689ca43c -> 420a71860 Reduce cost of sample combine test There's no need to run this test 300 times, which takes tens of seconds. Also split global and per-key sampling into separate tests. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1dc391d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1dc391d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1dc391d Branch: refs/heads/master Commit: d1dc391dd17345368a03187591a99ddf3d18282f Parents: 2689ca4 Author: Robert BradshawAuthored: Tue Jan 31 12:51:27 2017 -0800 Committer: Robert Bradshaw Committed: Wed Feb 1 10:56:53 2017 -0800 -- .../apache_beam/transforms/combiners_test.py| 32 1 file changed, 12 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/d1dc391d/sdks/python/apache_beam/transforms/combiners_test.py -- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 8a6d352..ba8ae82 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -218,29 +218,21 @@ class CombineTest(unittest.TestCase): assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot') pipeline.run() - def test_sample(self): + def test_global_sample(self): -# First test global samples (lots of them). -for ix in xrange(300): - pipeline = TestPipeline() +def is_good_sample(actual): + assert len(actual) == 1 + assert sorted(actual[0]) in [[1, 1, 2], [1, 2, 2]], actual + +with TestPipeline() as pipeline: pcoll = pipeline | 'start' >> Create([1, 1, 2, 2]) - result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3) - - def matcher(): -def match(actual): - # There is always exactly one result. - equal_to([1])([len(actual)]) - # There are always exactly three samples in the result. - equal_to([3])([len(actual[0])]) - # Sampling is without replacement. - num_ones = sum(1 for x in actual[0] if x == 1) - num_twos = sum(1 for x in actual[0] if x == 2) - equal_to([1, 2])([num_ones, num_twos]) -return match - assert_that(result, matcher()) - pipeline.run() + for ix in xrange(30): +assert_that( +pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3), +is_good_sample, +label='check-%d' % ix) -# Now test per-key samples. + def test_per_key_sample(self): pipeline = TestPipeline() pcoll = pipeline | 'start-perkey' >> Create( sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
[1/3] beam git commit: Implement combiner lifting for direct runner.
Repository: beam Updated Branches: refs/heads/master b80aac5e3 -> e9cd41165 Implement combiner lifting for direct runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0de5cf87 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0de5cf87 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0de5cf87 Branch: refs/heads/master Commit: 0de5cf875aaef9e987561371a0fa56c875ce45c1 Parents: b80aac5 Author: Robert BradshawAuthored: Tue Jan 31 11:41:09 2017 -0800 Committer: Robert Bradshaw Committed: Tue Jan 31 15:35:21 2017 -0800 -- .../apache_beam/runners/direct/direct_runner.py | 15 +++- .../runners/direct/helper_transforms.py | 77 2 files changed, 88 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/direct_runner.py -- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index dc2668d..28dc012 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -40,6 +40,17 @@ class DirectRunner(PipelineRunner): def __init__(self): self._cache = None + def apply_CombinePerKey(self, transform, pcoll): +# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems +# with resolving imports when they are at top. +# pylint: disable=wrong-import-position +from apache_beam.runners.direct.helper_transforms import LiftedCombinePerKey +try: + return pcoll | LiftedCombinePerKey( +transform.fn, transform.args, transform.kwargs) +except NotImplementedError: + return transform.expand(pcoll) + def run(self, pipeline): """Execute the entire pipeline and returns an DirectPipelineResult.""" @@ -90,10 +101,6 @@ class DirectRunner(PipelineRunner): self._cache = BufferingInMemoryCache() return self._cache.pvalue_cache - def apply(self, transform, input): # pylint: disable=redefined-builtin -"""Runner callback for a pipeline.apply call.""" -return transform.expand(input) - class BufferingInMemoryCache(object): """PValueCache wrapper for buffering bundles until a PValue is fully computed. http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/helper_transforms.py -- diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py new file mode 100644 index 000..340db75 --- /dev/null +++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import collections +import itertools + +import apache_beam as beam +from apache_beam.utils.windowed_value import WindowedValue +from apache_beam.internal.util import ArgumentPlaceholder + + +class LiftedCombinePerKey(beam.PTransform): + """An implementation of CombinePerKey that does mapper-side pre-combining. + """ + def __init__(self, combine_fn, args, kwargs): +if any(isinstance(arg, ArgumentPlaceholder) + for arg in itertools.chain(args, kwargs.values())): + # This isn't implemented in dataflow either... + raise NotImplementedError('Deferred CombineFn side inputs.') +self._combine_fn = beam.transforms.combiners.curry_combine_fn( +combine_fn, args, kwargs) + + def expand(self, pcoll): +return (pcoll + | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn)) + | beam.GroupByKey() + | beam.ParDo(FinishCombine(self._combine_fn))) + + +class PartialGroupByKeyCombiningValues(beam.DoFn): + """Aggregates values into a per-key-window cache. + + As bundles are in-memory-sized, we don't bother flushing until the very end. + """ + def __init__(self, combine_fn): +
[GitHub] beam pull request #1882: Combiner lifting
GitHub user robertwb opened a pull request: https://github.com/apache/beam/pull/1882 Combiner lifting Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam combiner-lifting Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1882.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1882 commit d25b14fc74f24bc88debbf3fd42a3bc775249462 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-01-31T19:41:09Z Implement combiner lifting for direct runner. commit 07e4154d4afbd0cbdda4419a7c18e6e338cef869 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-01-31T20:41:22Z Fix typehints tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam-site pull request #131: Clarify maturity of various runners.
GitHub user robertwb opened a pull request: https://github.com/apache/beam-site/pull/131 Clarify maturity of various runners. R: @francesperry You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam-site patch-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam-site/pull/131.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #131 commit 1835c0efdc17177a7fe6dd9e8d554fe89c4ba731 Author: Robert Bradshaw <rober...@gmail.com> Date: 2017-01-27T20:14:03Z Clarify maturity of various runners. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/3] beam git commit: Updating dataflow client protos to add new metrics.
Repository: beam Updated Branches: refs/heads/python-sdk 3d6f20d67 -> 52d97e2fc http://git-wip-us.apache.org/repos/asf/beam/blob/901a14c4/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py -- diff --git a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py index 178a542..a42154e 100644 --- a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py +++ b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py @@ -24,6 +24,7 @@ and continuous computation. from apitools.base.protorpclite import messages as _messages from apitools.base.py import encoding +from apitools.base.py import extra_types package = 'dataflow' @@ -193,6 +194,7 @@ class CounterMetadata(_messages.Message): AND: Aggregated value represents the logical 'and' of all contributed values. SET: Aggregated value is a set of unique contributed values. + DISTRIBUTION: Aggregated value captures statistics about a distribution. """ INVALID = 0 SUM = 1 @@ -202,6 +204,7 @@ class CounterMetadata(_messages.Message): OR = 5 AND = 6 SET = 7 +DISTRIBUTION = 8 class StandardUnitsValueValuesEnum(_messages.Enum): """System defined Units, see above enum. @@ -308,6 +311,7 @@ class CounterUpdate(_messages.Message): aggregate value accumulated since the worker started working on this WorkItem. By default this is false, indicating that this counter is reported as a delta. +distribution: Distribution data floatingPoint: Floating point value for Sum, Max, Min. floatingPointList: List of floating point numbers, for Set. floatingPointMean: Floating point mean aggregation value for Mean. @@ -326,34 +330,38 @@ class CounterUpdate(_messages.Message): boolean = _messages.BooleanField(1) cumulative = _messages.BooleanField(2) - floatingPoint = _messages.FloatField(3) - floatingPointList = _messages.MessageField('FloatingPointList', 4) - floatingPointMean = _messages.MessageField('FloatingPointMean', 5) - integer = _messages.MessageField('SplitInt64', 6) - integerList = _messages.MessageField('IntegerList', 7) - integerMean = _messages.MessageField('IntegerMean', 8) - internal = _messages.MessageField('extra_types.JsonValue', 9) - nameAndKind = _messages.MessageField('NameAndKind', 10) - shortId = _messages.IntegerField(11) - stringList = _messages.MessageField('StringList', 12) - structuredNameAndMetadata = _messages.MessageField('CounterStructuredNameAndMetadata', 13) + distribution = _messages.MessageField('DistributionUpdate', 3) + floatingPoint = _messages.FloatField(4) + floatingPointList = _messages.MessageField('FloatingPointList', 5) + floatingPointMean = _messages.MessageField('FloatingPointMean', 6) + integer = _messages.MessageField('SplitInt64', 7) + integerList = _messages.MessageField('IntegerList', 8) + integerMean = _messages.MessageField('IntegerMean', 9) + internal = _messages.MessageField('extra_types.JsonValue', 10) + nameAndKind = _messages.MessageField('NameAndKind', 11) + shortId = _messages.IntegerField(12) + stringList = _messages.MessageField('StringList', 13) + structuredNameAndMetadata = _messages.MessageField('CounterStructuredNameAndMetadata', 14) class CreateJobFromTemplateRequest(_messages.Message): - """Request to create a Dataflow job. + """A request to create a Cloud Dataflow job from a template. Messages: -ParametersValue: Dynamic parameterization of the job's runtime - environment. +ParametersValue: The runtime parameters to pass to the job. Fields: -gcsPath: A path to the serialized JSON representation of the job. -parameters: Dynamic parameterization of the job's runtime environment. +environment: The runtime environment for the job. +gcsPath: Required. A Cloud Storage path to the template from which to + create the job. Must be a valid Cloud Storage URL, beginning with + `gs://`. +jobName: Required. The job name to use for the created job. +parameters: The runtime parameters to pass to the job. """ @encoding.MapUnrecognizedFields('additionalProperties') class ParametersValue(_messages.Message): -"""Dynamic parameterization of the job's runtime environment. +"""The runtime parameters to pass to the job. Messages: AdditionalProperty: An additional property for a ParametersValue object. @@ -375,8 +383,10 @@ class CreateJobFromTemplateRequest(_messages.Message): additionalProperties = _messages.MessageField('AdditionalProperty', 1, repeated=True) - gcsPath = _messages.StringField(1) - parameters = _messages.MessageField('ParametersValue', 2) + environment = _messages.MessageField('RuntimeEnvironment', 1) + gcsPath = _messages.StringField(2)
[3/3] beam git commit: Closes #1857
Closes #1857 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52d97e2f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52d97e2f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52d97e2f Branch: refs/heads/python-sdk Commit: 52d97e2fc2e383a58969447addd45ebe3eed4f5f Parents: 3d6f20d 901a14c Author: Robert BradshawAuthored: Fri Jan 27 12:00:25 2017 -0800 Committer: Robert Bradshaw Committed: Fri Jan 27 12:00:25 2017 -0800 -- .../clients/dataflow/dataflow_v1b3_client.py| 578 .../clients/dataflow/dataflow_v1b3_messages.py | 931 +-- 2 files changed, 1075 insertions(+), 434 deletions(-) --
[2/3] beam git commit: Updating dataflow client protos to add new metrics.
Updating dataflow client protos to add new metrics. In order to use counter structured names, and add new metrics (e.g. Distributions), we need to update the dataflow client protocol buffers. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/901a14c4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/901a14c4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/901a14c4 Branch: refs/heads/python-sdk Commit: 901a14c47bd21fadd563de1017f7c9d2b38cf4f1 Parents: 3d6f20d Author: PabloAuthored: Thu Jan 26 18:02:04 2017 -0800 Committer: Pablo Committed: Fri Jan 27 11:46:26 2017 -0800 -- .../clients/dataflow/dataflow_v1b3_client.py| 578 .../clients/dataflow/dataflow_v1b3_messages.py | 931 +-- 2 files changed, 1075 insertions(+), 434 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/901a14c4/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py -- diff --git a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py index 840b887..6ae2b73 100644 --- a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py +++ b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py @@ -55,6 +55,10 @@ class DataflowV1b3(base_api.BaseApiClient): self.projects_jobs_messages = self.ProjectsJobsMessagesService(self) self.projects_jobs_workItems = self.ProjectsJobsWorkItemsService(self) self.projects_jobs = self.ProjectsJobsService(self) +self.projects_locations_jobs_messages = self.ProjectsLocationsJobsMessagesService(self) +self.projects_locations_jobs_workItems = self.ProjectsLocationsJobsWorkItemsService(self) +self.projects_locations_jobs = self.ProjectsLocationsJobsService(self) +self.projects_locations = self.ProjectsLocationsService(self) self.projects_templates = self.ProjectsTemplatesService(self) self.projects = self.ProjectsService(self) @@ -65,33 +69,6 @@ class DataflowV1b3(base_api.BaseApiClient): def __init__(self, client): super(DataflowV1b3.ProjectsJobsDebugService, self).__init__(client) - self._method_configs = { - 'GetConfig': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.debug.getConfig', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/getConfig', - request_field=u'getDebugConfigRequest', - request_type_name=u'DataflowProjectsJobsDebugGetConfigRequest', - response_type_name=u'GetDebugConfigResponse', - supports_download=False, - ), - 'SendCapture': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.debug.sendCapture', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/sendCapture', - request_field=u'sendDebugCaptureRequest', - request_type_name=u'DataflowProjectsJobsDebugSendCaptureRequest', - response_type_name=u'SendDebugCaptureResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -108,6 +85,19 @@ class DataflowV1b3(base_api.BaseApiClient): return self._RunMethod( config, request, global_params=global_params) +GetConfig.method_config = lambda: base_api.ApiMethodInfo( +http_method=u'POST', +method_id=u'dataflow.projects.jobs.debug.getConfig', +ordered_params=[u'projectId', u'jobId'], +path_params=[u'jobId', u'projectId'], +query_params=[], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/getConfig', +request_field=u'getDebugConfigRequest', +request_type_name=u'DataflowProjectsJobsDebugGetConfigRequest', +response_type_name=u'GetDebugConfigResponse', +supports_download=False, +) + def SendCapture(self, request, global_params=None): """Send encoded debug capture data for component. @@ -121,6 +111,19 @@ class DataflowV1b3(base_api.BaseApiClient): return self._RunMethod( config, request, global_params=global_params) +SendCapture.method_config = lambda: base_api.ApiMethodInfo( +http_method=u'POST', +
[1/2] beam git commit: Refactoring metrics infrastructure
ainer_stack() +self.PER_THREAD.container.pop() - @classmethod - def set_current_container(cls, container): -cls.PER_THREAD.container = container - @classmethod - def unset_current_container(cls): -del cls.PER_THREAD.container +MetricsEnvironment = _MetricsEnvironment() class MetricsContainer(object): @@ -180,16 +193,21 @@ class MetricsContainer(object): class ScopedMetricsContainer(object): - def __init__(self, container): -self._old_container = MetricsEnvironment.current_container() + def __init__(self, container=None): +self._stack = MetricsEnvironment.container_stack() self._container = container + def enter(self): +self._stack.append(self._container) + + def exit(self): +self._stack.pop() + def __enter__(self): -MetricsEnvironment.set_current_container(self._container) -return self._container +self.enter() def __exit__(self, type, value, traceback): -MetricsEnvironment.set_current_container(self._old_container) +self.exit() class MetricUpdates(object): http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/common.pxd -- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 10d1f96..f41b313 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -18,6 +18,7 @@ cimport cython from apache_beam.utils.windowed_value cimport WindowedValue +from apache_beam.metrics.execution cimport ScopedMetricsContainer cdef type SideOutputValue, TimestampedValue @@ -40,6 +41,7 @@ cdef class DoFnRunner(Receiver): cdef object args cdef dict kwargs cdef object side_inputs + cdef ScopedMetricsContainer scoped_metrics_container cdef bint has_windowed_side_inputs cdef Receiver main_receivers http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/common.py -- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 3741582..cb47513 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -22,6 +22,7 @@ import sys from apache_beam.internal import util +from apache_beam.metrics.execution import ScopedMetricsContainer from apache_beam.pvalue import SideOutputValue from apache_beam.transforms import core from apache_beam.transforms import window @@ -69,7 +70,8 @@ class DoFnRunner(Receiver): logging_context=None, # Preferred alternative to context # TODO(robertwb): Remove once all runners are updated. - state=None): + state=None, + scoped_metrics_container=None): """Initializes a DoFnRunner. Args: @@ -84,10 +86,13 @@ class DoFnRunner(Receiver): step_name: the name of this step logging_context: a LoggingContext object state: handle for accessing DoFn state + scoped_metrics_container: Context switcher for metrics container """ self.step_name = step_name self.window_fn = windowing.windowfn self.tagged_receivers = tagged_receivers +self.scoped_metrics_container = (scoped_metrics_container + or ScopedMetricsContainer()) global_window = window.GlobalWindow() @@ -236,6 +241,7 @@ class DoFnRunner(Receiver): def _invoke_bundle_method(self, method): try: self.logging_context.enter() + self.scoped_metrics_container.enter() self.context.set_element(None) f = getattr(self.dofn, method) @@ -251,6 +257,7 @@ class DoFnRunner(Receiver): except BaseException as exn: self.reraise_augmented(exn) finally: + self.scoped_metrics_container.exit() self.logging_context.exit() def start(self): @@ -262,6 +269,7 @@ class DoFnRunner(Receiver): def process(self, element): try: self.logging_context.enter() + self.scoped_metrics_container.enter() if self.is_new_dofn: self.new_dofn_process(element) else: @@ -269,6 +277,7 @@ class DoFnRunner(Receiver): except BaseException as exn: self.reraise_augmented(exn) finally: + self.scoped_metrics_container.exit() self.logging_context.exit() def reraise_augmented(self, exn): http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/direct/executor.py -- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 7e404f8..2d4a8bd 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -27,7 +27,7 @@ impo
[2/2] beam git commit: Closes #1835
Closes #1835 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d6f20d6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d6f20d6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d6f20d6 Branch: refs/heads/python-sdk Commit: 3d6f20d677bd7397e6e2d099c829bf4c439f8d18 Parents: e3849af b148f5c Author: Robert BradshawAuthored: Thu Jan 26 15:28:50 2017 -0800 Committer: Robert Bradshaw Committed: Thu Jan 26 15:28:50 2017 -0800 -- sdks/python/apache_beam/metrics/execution.pxd | 31 + sdks/python/apache_beam/metrics/execution.py| 70 sdks/python/apache_beam/runners/common.pxd | 2 + sdks/python/apache_beam/runners/common.py | 11 ++- .../apache_beam/runners/direct/executor.py | 12 ++-- .../runners/direct/transform_evaluator.py | 54 --- sdks/python/setup.py| 1 + 7 files changed, 125 insertions(+), 56 deletions(-) --
[1/2] beam git commit: Fix read/write display data
Repository: beam Updated Branches: refs/heads/python-sdk c6420df97 -> e3849af8c Fix read/write display data Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4eda3c3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4eda3c3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4eda3c3 Branch: refs/heads/python-sdk Commit: e4eda3c335b5767bdaf40b56b2dd5d67d7348f20 Parents: c6420df Author: PabloAuthored: Fri Jan 13 11:25:36 2017 -0800 Committer: Robert Bradshaw Committed: Thu Jan 26 14:51:56 2017 -0800 -- sdks/python/apache_beam/io/avroio_test.py | 6 sdks/python/apache_beam/io/fileio.py | 10 ++- sdks/python/apache_beam/io/fileio_test.py | 2 -- sdks/python/apache_beam/io/iobase.py | 38 +- sdks/python/apache_beam/io/textio.py | 25 + sdks/python/apache_beam/io/textio_test.py | 30 6 files changed, 47 insertions(+), 64 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/avroio_test.py -- diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index aed468d..d2fb1d1 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -196,9 +196,6 @@ class TestAvro(unittest.TestCase): 'file_pattern', 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d.end'), DisplayDataItemMatcher( -'shards', -0), -DisplayDataItemMatcher( 'codec', 'null'), DisplayDataItemMatcher( @@ -219,9 +216,6 @@ class TestAvro(unittest.TestCase): 'file_pattern', 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d'), DisplayDataItemMatcher( -'shards', -0), -DisplayDataItemMatcher( 'codec', 'deflate'), DisplayDataItemMatcher( http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 52f31c6..f67dca9 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -547,7 +547,8 @@ class FileSink(iobase.Sink): def display_data(self): return {'shards': -DisplayDataItem(self.num_shards, label='Number of Shards'), +DisplayDataItem(self.num_shards, +label='Number of Shards').drop_if_default(0), 'compression': DisplayDataItem(str(self.compression_type)), 'file_pattern': @@ -787,6 +788,13 @@ class TextFileSink(FileSink): '\'textio.WriteToText()\' instead of directly ' 'instantiating a TextFileSink object.') + def display_data(self): +dd_parent = super(TextFileSink, self).display_data() +dd_parent['append_newline'] = DisplayDataItem( +self.append_trailing_newlines, +label='Append Trailing New Lines') +return dd_parent + def write_encoded_record(self, file_handle, encoded_value): """Writes a single encoded record.""" file_handle.write(encoded_value) http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio_test.py -- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index ad77dc5..6c33f53 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -142,8 +142,6 @@ class TestFileSink(unittest.TestCase): dd = DisplayData.create_from(sink) expected_items = [ DisplayDataItemMatcher( -'shards', 0), -DisplayDataItemMatcher( 'compression', 'auto'), DisplayDataItemMatcher( 'file_pattern', http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/iobase.py -- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 12af3b6..1266ed3 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -759,16 +759,15 @@ class WriteImpl(ptransform.PTransform): write_result_coll = (keyed_pcoll | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | 'WriteBundles' >> core.Map( -
[2/2] beam git commit: Closes #1776
Closes #1776 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e3849af8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e3849af8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e3849af8 Branch: refs/heads/python-sdk Commit: e3849af8c8b0982de07f2c24417042be91474039 Parents: c6420df e4eda3c Author: Robert BradshawAuthored: Thu Jan 26 14:51:57 2017 -0800 Committer: Robert Bradshaw Committed: Thu Jan 26 14:51:57 2017 -0800 -- sdks/python/apache_beam/io/avroio_test.py | 6 sdks/python/apache_beam/io/fileio.py | 10 ++- sdks/python/apache_beam/io/fileio_test.py | 2 -- sdks/python/apache_beam/io/iobase.py | 38 +- sdks/python/apache_beam/io/textio.py | 25 + sdks/python/apache_beam/io/textio_test.py | 30 6 files changed, 47 insertions(+), 64 deletions(-) --
[GitHub] beam pull request #1837: [BEAM-1218 ] Remove dataflow_test.py
Github user robertwb closed the pull request at: https://github.com/apache/beam/pull/1837 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: Closes #1837
Closes #1837 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6420df9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6420df9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6420df9 Branch: refs/heads/python-sdk Commit: c6420df9791eb6083fba1f74bd88e06ce8f6a61f Parents: 4e1028b 2aa7d47 Author: Robert BradshawAuthored: Wed Jan 25 16:18:10 2017 -0800 Committer: Robert Bradshaw Committed: Wed Jan 25 16:18:10 2017 -0800 -- sdks/python/apache_beam/dataflow_test.py| 418 --- .../apache_beam/transforms/ptransform_test.py | 67 +++ .../apache_beam/transforms/sideinputs_test.py | 208 - 3 files changed, 274 insertions(+), 419 deletions(-) --
[1/2] beam git commit: Closes #1844
Repository: beam Updated Branches: refs/heads/python-sdk 592422059 -> 4e1028b3d Closes #1844 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e1028b3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e1028b3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e1028b3 Branch: refs/heads/python-sdk Commit: 4e1028b3dfeaf02e51eb9f3b5d1a5e78c1cfcbb9 Parents: 5924220 5787e81 Author: Robert BradshawAuthored: Wed Jan 25 16:16:52 2017 -0800 Committer: Robert Bradshaw Committed: Wed Jan 25 16:16:52 2017 -0800 -- .../python/apache_beam/utils/dependency_test.py | 47 +++- 1 file changed, 27 insertions(+), 20 deletions(-) --
[2/2] beam git commit: Use a temp directory for requirements cache in test_with_requirements_file
Use a temp directory for requirements cache in test_with_requirements_file The test fails if there are leftover files in the default folder for requirements cache either from earlier tests, or from the previous workspaces. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5787e817 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5787e817 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5787e817 Branch: refs/heads/python-sdk Commit: 5787e817a7eda4859963d535df21f2fa00edf8af Parents: 5924220 Author: Ahmet AltayAuthored: Wed Jan 25 09:57:18 2017 -0800 Committer: Robert Bradshaw Committed: Wed Jan 25 16:16:52 2017 -0800 -- .../python/apache_beam/utils/dependency_test.py | 47 +++- 1 file changed, 27 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/5787e817/sdks/python/apache_beam/utils/dependency_test.py -- diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py index a484d60..75a89e2 100644 --- a/sdks/python/apache_beam/utils/dependency_test.py +++ b/sdks/python/apache_beam/utils/dependency_test.py @@ -106,27 +106,34 @@ class SetupTest(unittest.TestCase): dependency.stage_job_resources(options)) def test_with_requirements_file(self): -staging_dir = tempfile.mkdtemp() -source_dir = tempfile.mkdtemp() +try: + staging_dir = tempfile.mkdtemp() + requirements_cache_dir = tempfile.mkdtemp() + source_dir = tempfile.mkdtemp() -options = PipelineOptions() -options.view_as(GoogleCloudOptions).staging_location = staging_dir -self.update_options(options) -options.view_as(SetupOptions).requirements_file = os.path.join( -source_dir, dependency.REQUIREMENTS_FILE) -self.create_temp_file( -os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing') -self.assertEqual( -sorted([dependency.REQUIREMENTS_FILE, -'abc.txt', 'def.txt']), -sorted(dependency.stage_job_resources( -options, -populate_requirements_cache=self.populate_requirements_cache))) -self.assertTrue( -os.path.isfile( -os.path.join(staging_dir, dependency.REQUIREMENTS_FILE))) -self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) -self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt'))) + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).requirements_cache = requirements_cache_dir + options.view_as(SetupOptions).requirements_file = os.path.join( + source_dir, dependency.REQUIREMENTS_FILE) + self.create_temp_file( + os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing') + self.assertEqual( + sorted([dependency.REQUIREMENTS_FILE, + 'abc.txt', 'def.txt']), + sorted(dependency.stage_job_resources( + options, + populate_requirements_cache=self.populate_requirements_cache))) + self.assertTrue( + os.path.isfile( + os.path.join(staging_dir, dependency.REQUIREMENTS_FILE))) + self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) + self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt'))) +finally: + shutil.rmtree(staging_dir) + shutil.rmtree(requirements_cache_dir) + shutil.rmtree(source_dir) def test_requirements_file_not_present(self): staging_dir = tempfile.mkdtemp()
[GitHub] beam pull request #1811: Cleanup tests in pipeline_test.
Github user robertwb closed the pull request at: https://github.com/apache/beam/pull/1811 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---