[2/2] beam git commit: Closes #2586

2017-04-20 Thread robertwb
Closes #2586


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4121ec49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4121ec49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4121ec49

Branch: refs/heads/master
Commit: 4121ec490f2b3714cb668cc5811ba0bf88d610e5
Parents: e26bfbe c25380b
Author: Robert Bradshaw 
Authored: Thu Apr 20 08:59:52 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:59:52 2017 -0700

--
 .../examples/cookbook/bigquery_side_input.py  |  2 +-
 .../apache_beam/examples/cookbook/filters.py  |  2 +-
 sdks/python/apache_beam/io/concat_source_test.py  |  2 +-
 .../python/apache_beam/io/filebasedsource_test.py | 18 +-
 sdks/python/apache_beam/io/sources_test.py|  2 +-
 sdks/python/apache_beam/io/tfrecordio_test.py |  8 
 sdks/python/apache_beam/pipeline_test.py  |  2 +-
 sdks/python/apache_beam/transforms/core.py| 13 ++---
 8 files changed, 20 insertions(+), 29 deletions(-)
--




[2/2] beam git commit: Closes #2604

2017-04-20 Thread robertwb
Closes #2604


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e26bfbe0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e26bfbe0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e26bfbe0

Branch: refs/heads/master
Commit: e26bfbe0ae867494cd63dc4fb04473f67ce102fa
Parents: 7019aa7 815bbdc
Author: Robert Bradshaw 
Authored: Thu Apr 20 08:57:07 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:57:07 2017 -0700

--
 .../apache_beam/examples/cookbook/bigshuffle.py | 94 
 .../examples/cookbook/bigshuffle_test.py| 63 -
 2 files changed, 157 deletions(-)
--




[1/2] beam git commit: Remove bigshuffle from python examples

2017-04-20 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 7019aa70d -> e26bfbe0a


Remove bigshuffle from python examples


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/815bbdcc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/815bbdcc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/815bbdcc

Branch: refs/heads/master
Commit: 815bbdccc6fd9f0cfcd3b9fb86476b61eb08b293
Parents: 7019aa7
Author: Vikas Kedigehalli 
Authored: Wed Apr 19 17:12:52 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:57:06 2017 -0700

--
 .../apache_beam/examples/cookbook/bigshuffle.py | 94 
 .../examples/cookbook/bigshuffle_test.py| 63 -
 2 files changed, 157 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/815bbdcc/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
--
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py 
b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
deleted file mode 100644
index 79cc85c..000
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ /dev/null
@@ -1,94 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A BigShuffle workflow."""
-
-from __future__ import absolute_import
-
-import argparse
-import binascii
-import logging
-
-
-import apache_beam as beam
-from apache_beam.io import ReadFromText
-from apache_beam.io import WriteToText
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
-
-
-def crc32line(line):
-  return binascii.crc32(line) & 0x
-
-
-def run(argv=None):
-  # pylint: disable=expression-not-assigned
-
-  parser = argparse.ArgumentParser()
-  parser.add_argument('--input',
-  required=True,
-  help='Input file pattern to process.')
-  parser.add_argument('--output',
-  required=True,
-  help='Output file pattern to write results to.')
-  parser.add_argument('--checksum_output',
-  help='Checksum output file pattern.')
-  known_args, pipeline_args = parser.parse_known_args(argv)
-  # We use the save_main_session option because one or more DoFn's in this
-  # workflow rely on global context (e.g., a module imported at module level).
-  pipeline_options = PipelineOptions(pipeline_args)
-  pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  # Read the text file[pattern] into a PCollection.
-  lines = p | ReadFromText(known_args.input, coder=beam.coders.BytesCoder())
-
-  # Count the occurrences of each word.
-  output = (lines
-| 'split' >> beam.Map(
-lambda x: (x[:10], x[10:99]))
-.with_output_types(beam.typehints.KV[str, str])
-| 'group' >> beam.GroupByKey()
-| 'format' >> beam.FlatMap(
-lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
-
-  # Write the output using a "Write" transform that has side effects.
-  output | WriteToText(known_args.output)
-
-  # Optionally write the input and output checksums.
-  if known_args.checksum_output:
-input_csum = (lines
-  | 'input-csum' >> beam.Map(crc32line)
-  | 'combine-input-csum' >> beam.CombineGlobally(sum)
-  | 'hex-format' >> beam.Map(lambda x: '%x' % x))
-input_csum | 'write-input-csum' >> WriteToText(
-known_args.checksum_output + '-input')
-
-output_csum = (output
-   | 'output-csum' >> beam.Map(crc32line)
-   | 'combine-output-csum' >> beam.CombineGlobally(sum)
-   | 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
-output_csum | 'write-output-csum' >> WriteToText(
-known_args.checksum_output + '-output')
-
-  # Actually run the pipeline (all operations above are deferred).
-  return p.run()
-
-
-if 

[1/6] beam git commit: Make stage names consistent.

2017-04-20 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 4e0d5f596 -> 7019aa70d


Make stage names consistent.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/72f50209
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/72f50209
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/72f50209

Branch: refs/heads/master
Commit: 72f502091c2c3e534f41b3fbd211c2a701e89eba
Parents: 4e0d5f5
Author: Robert Bradshaw 
Authored: Tue Apr 11 10:29:15 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:55:02 2017 -0700

--
 sdks/python/apache_beam/io/gcp/bigquery.py   |  6 +++---
 .../io/gcp/datastore/v1/datastoreio.py   |  6 +++---
 sdks/python/apache_beam/io/iobase.py |  2 +-
 sdks/python/apache_beam/pipeline.py  |  4 ++--
 sdks/python/apache_beam/transforms/core.py   | 19 ++-
 sdks/python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py|  4 ++--
 7 files changed, 22 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/bigquery.py
--
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4686518..891f62a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -45,11 +45,11 @@ call *one* row of the main table and *all* rows of the side 
table. The runner
 may use some caching techniques to share the side inputs between calls in order
 to avoid excessive reading:::
 
-  main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource()
-  side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
+  main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource()
+  side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource()
   results = (
   main_table
-  | 'process data' >> beam.Map(
+  | 'ProcessData' >> beam.Map(
   lambda element, side_input: ..., AsList(side_table)))
 
 There is no difference in how main and side inputs are read. What makes the

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
--
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py 
b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index d9b3598..a0ccbbb 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -137,15 +137,15 @@ class ReadFromDatastore(PTransform):
 #   outputs a ``PCollection[Entity]``.
 
 queries = (pcoll.pipeline
-   | 'User Query' >> Create([self._query])
-   | 'Split Query' >> ParDo(ReadFromDatastore.SplitQueryFn(
+   | 'UserQuery' >> Create([self._query])
+   | 'SplitQuery' >> ParDo(ReadFromDatastore.SplitQueryFn(
self._project, self._query, self._datastore_namespace,
self._num_splits)))
 
 sharded_queries = (queries
| GroupByKey()
| Values()
-   | 'flatten' >> FlatMap(lambda x: x))
+   | 'Flatten' >> FlatMap(lambda x: x))
 
 entities = sharded_queries | 'Read' >> ParDo(
 ReadFromDatastore.ReadFn(self._project, self._datastore_namespace))

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/iobase.py
--
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index d9df5c4..2cac67f 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -903,7 +903,7 @@ class WriteImpl(ptransform.PTransform):
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
| 'Extract' >> core.FlatMap(lambda x: x[1]))
-return do_once | 'finalize_write' >> core.FlatMap(
+return do_once | 'FinalizeWrite' >> core.FlatMap(
 _finalize_write,
 self.sink,
 AsSingleton(init_result_coll),

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/pipeline.py
--
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 2ff9eb3..8e811bc 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -32,11 +32,11 

[6/6] beam git commit: Closes #2552

2017-04-20 Thread robertwb
Closes #2552


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7019aa70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7019aa70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7019aa70

Branch: refs/heads/master
Commit: 7019aa70db62575ae275ae93155bb0903a1ff26e
Parents: 4e0d5f5 2c34719
Author: Robert Bradshaw 
Authored: Thu Apr 20 08:55:06 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:55:06 2017 -0700

--
 sdks/python/apache_beam/coders/coders.py| 16 -
 .../apache_beam/coders/coders_test_common.py|  7 ++--
 sdks/python/apache_beam/io/gcp/bigquery.py  |  6 ++--
 .../io/gcp/datastore/v1/datastoreio.py  |  6 ++--
 sdks/python/apache_beam/io/iobase.py|  2 +-
 sdks/python/apache_beam/pipeline.py |  4 +--
 sdks/python/apache_beam/runners/runner.py   | 34 
 sdks/python/apache_beam/transforms/core.py  | 23 +++--
 .../python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py   |  4 +--
 sdks/python/apache_beam/transforms/trigger.py   | 10 +++---
 .../apache_beam/transforms/trigger_test.py  | 10 +++---
 sdks/python/apache_beam/transforms/window.py| 18 ++-
 13 files changed, 58 insertions(+), 84 deletions(-)
--




[2/6] beam git commit: Require deterministic window coders.

2017-04-20 Thread robertwb
Require deterministic window coders.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d98294c2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d98294c2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d98294c2

Branch: refs/heads/master
Commit: d98294c2bd13b45522ea584485bd62e900144c88
Parents: 72f5020
Author: Robert Bradshaw 
Authored: Tue Apr 11 10:49:13 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:55:03 2017 -0700

--
 sdks/python/apache_beam/coders/coders.py  | 16 
 .../apache_beam/coders/coders_test_common.py  |  1 -
 sdks/python/apache_beam/transforms/core.py|  4 
 sdks/python/apache_beam/transforms/window.py  | 18 +-
 4 files changed, 21 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders.py
--
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index 8ef0a46..4f75182 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -688,22 +688,6 @@ class IterableCoder(FastCoder):
 return hash((type(self), self._elem_coder))
 
 
-class WindowCoder(PickleCoder):
-  """Coder for windows in windowed values."""
-
-  def _create_impl(self):
-return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
-
-  def is_deterministic(self):
-# Note that WindowCoder as implemented is not deterministic because the
-# implementation simply pickles windows.  See the corresponding comments
-# on PickleCoder for more details.
-return False
-
-  def as_cloud_object(self):
-return super(WindowCoder, self).as_cloud_object(is_pair_like=False)
-
-
 class GlobalWindowCoder(SingletonCoder):
   """Coder for global windows."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders_test_common.py
--
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index 6491ea8..da0bde3 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -62,7 +62,6 @@ class CodersTest(unittest.TestCase):
  coders.FastCoder,
  coders.ProtoCoder,
  coders.ToStringCoder,
- coders.WindowCoder,
  coders.IntervalWindowCoder])
 assert not standard - cls.seen, standard - cls.seen
 assert not standard - cls.seen_nested, standard - cls.seen_nested

http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/core.py
--
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 2d28eec..9f66c39 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1185,6 +1185,10 @@ class Windowing(object):
   else:
 raise ValueError(
 'accumulation_mode must be provided for non-trivial triggers')
+if not windowfn.get_window_coder().is_deterministic():
+  raise ValueError(
+  'window fn (%s) does not have a determanistic coder (%s)' % (
+  window_fn, windowfn.get_window_coder()))
 self.windowfn = windowfn
 self.triggerfn = triggerfn
 self.accumulation_mode = accumulation_mode

http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/window.py
--
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index 931a17d..643cb99 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -49,6 +49,8 @@ WindowFn.
 
 from __future__ import absolute_import
 
+import abc
+
 from google.protobuf import struct_pb2
 from google.protobuf import wrappers_pb2
 
@@ -93,6 +95,8 @@ class OutputTimeFn(object):
 class WindowFn(object):
   """An abstract windowing function defining a basic assign and merge."""
 
+  __metaclass__ = abc.ABCMeta
+
   class AssignContext(object):
 """Context passed to WindowFn.assign()."""
 
@@ -100,6 +104,7 @@ class WindowFn(object):
   self.timestamp = Timestamp.of(timestamp)
   self.element = element
 
+  @abc.abstractmethod
   def assign(self, assign_context):
 """Associates a timestamp to an element."""
 raise NotImplementedError
@@ 

[3/6] beam git commit: Remove obsolete and unused Runner.clear

2017-04-20 Thread robertwb
Remove obsolete and unused Runner.clear


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4b9029ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4b9029ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4b9029ac

Branch: refs/heads/master
Commit: 4b9029ac3e965ed3629833138597f1fb365b0876
Parents: 68c0042
Author: Robert Bradshaw 
Authored: Tue Apr 11 10:57:35 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:55:04 2017 -0700

--
 sdks/python/apache_beam/runners/runner.py | 34 --
 1 file changed, 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4b9029ac/sdks/python/apache_beam/runners/runner.py
--
diff --git a/sdks/python/apache_beam/runners/runner.py 
b/sdks/python/apache_beam/runners/runner.py
index ccb066b..b35cb7b 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -99,8 +99,6 @@ class PipelineRunner(object):
 
   The base runner provides a run() method for visiting every node in the
   pipeline's DAG and executing the transforms computing the PValue in the node.
-  It also provides a clear() method for visiting every node and clearing out
-  the values contained in PValue objects produced during a run.
 
   A custom runner will typically provide implementations for some of the
   transform methods (ParDo, GroupByKey, Create, etc.). It may also
@@ -129,38 +127,6 @@ class PipelineRunner(object):
 
 pipeline.visit(RunVisitor(self))
 
-  def clear(self, pipeline, node=None):
-"""Clear all nodes or nodes reachable from node of materialized values.
-
-Args:
-  pipeline: Pipeline object containing PValues to be cleared.
-  node: Optional node in the Pipeline processing DAG. If specified only
-nodes reachable from this node will be cleared (ancestors of the node).
-
-This method is not intended (for now) to be called by users of Runner
-objects. It is a hook for future layers on top of the current programming
-model to control how much of the previously computed values are kept
-around. Presumably an interactivity layer will use it. The simplest way
-to change the behavior would be to define a runner that overwrites the
-clear_pvalue() method since this method (runner.clear) will visit all
-relevant nodes and call clear_pvalue on them.
-
-"""
-
-# Imported here to avoid circular dependencies.
-# pylint: disable=wrong-import-order, wrong-import-position
-from apache_beam.pipeline import PipelineVisitor
-
-class ClearVisitor(PipelineVisitor):
-
-  def __init__(self, runner):
-self.runner = runner
-
-  def visit_value(self, value, _):
-self.runner.clear_pvalue(value)
-
-pipeline.visit(ClearVisitor(self), node=node)
-
   def apply(self, transform, input):
 """Runner callback for a pipeline.apply call.
 



[4/6] beam git commit: Enable IntervalWindowCoder test check.

2017-04-20 Thread robertwb
Enable IntervalWindowCoder test check.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/68c00426
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/68c00426
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/68c00426

Branch: refs/heads/master
Commit: 68c00426261c54a32be509f9ab53cf3543d49d78
Parents: d98294c
Author: Robert Bradshaw 
Authored: Tue Apr 11 10:52:44 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:55:04 2017 -0700

--
 sdks/python/apache_beam/coders/coders_test_common.py | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/68c00426/sdks/python/apache_beam/coders/coders_test_common.py
--
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index da0bde3..e5bfe35 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -61,8 +61,7 @@ class CodersTest(unittest.TestCase):
 standard -= set([coders.Coder,
  coders.FastCoder,
  coders.ProtoCoder,
- coders.ToStringCoder,
- coders.IntervalWindowCoder])
+ coders.ToStringCoder])
 assert not standard - cls.seen, standard - cls.seen
 assert not standard - cls.seen_nested, standard - cls.seen_nested
 
@@ -171,6 +170,9 @@ class CodersTest(unittest.TestCase):
  *[window.IntervalWindow(x, y)
for x in [-2**52, 0, 2**52]
for y in range(-100, 100)])
+self.check_coder(
+coders.TupleCoder((coders.IntervalWindowCoder(),)),
+(window.IntervalWindow(0, 10),))
 
   def test_timestamp_coder(self):
 self.check_coder(coders.TimestampCoder(),



[5/6] beam git commit: Rename AfterFirst to AfterAny for consistency with Java.

2017-04-20 Thread robertwb
Rename AfterFirst to AfterAny for consistency with Java.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c347192
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c347192
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c347192

Branch: refs/heads/master
Commit: 2c347192709e4076a1ac59b5ea769d4d3b2494f4
Parents: 4b9029a
Author: Robert Bradshaw 
Authored: Tue Apr 11 11:00:04 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:55:05 2017 -0700

--
 sdks/python/apache_beam/transforms/trigger.py  | 10 ++
 sdks/python/apache_beam/transforms/trigger_test.py | 10 +-
 2 files changed, 11 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/2c347192/sdks/python/apache_beam/transforms/trigger.py
--
diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index 6a4cf24..b9786f4 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -186,7 +186,7 @@ class TriggerFn(object):
   def from_runner_api(proto, context):
 return {
 'after_all': AfterAll,
-'after_any': AfterFirst,
+'after_any': AfterAny,
 'after_each': AfterEach,
 'after_end_of_window': AfterWatermark,
 # after_processing_time, after_synchronized_processing_time
@@ -488,7 +488,8 @@ class ParallelTriggerFn(TriggerFn):
 in proto.after_all.subtriggers or proto.after_any.subtriggers]
 if proto.after_all.subtriggers:
   return AfterAll(*subtriggers)
-return AfterFirst(*subtriggers)
+else:
+  return AfterAny(*subtriggers)
 
   def to_runner_api(self, context):
 subtriggers = [
@@ -505,7 +506,7 @@ class ParallelTriggerFn(TriggerFn):
   raise NotImplementedError(self)
 
 
-class AfterFirst(ParallelTriggerFn):
+class AfterAny(ParallelTriggerFn):
   """Fires when any subtrigger fires.
 
   Also finishes when any subtrigger finishes.
@@ -589,7 +590,8 @@ class AfterEach(TriggerFn):
 for subtrigger in self.triggers]))
 
 
-class OrFinally(AfterFirst):
+class OrFinally(AfterAny):
+
   @staticmethod
   def from_runner_api(proto, context):
 return OrFinally(

http://git-wip-us.apache.org/repos/asf/beam/blob/2c347192/sdks/python/apache_beam/transforms/trigger_test.py
--
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py 
b/sdks/python/apache_beam/transforms/trigger_test.py
index 9f2046a..914babb 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -33,7 +33,7 @@ from apache_beam.transforms.trigger import AccumulationMode
 from apache_beam.transforms.trigger import AfterAll
 from apache_beam.transforms.trigger import AfterCount
 from apache_beam.transforms.trigger import AfterEach
-from apache_beam.transforms.trigger import AfterFirst
+from apache_beam.transforms.trigger import AfterAny
 from apache_beam.transforms.trigger import AfterWatermark
 from apache_beam.transforms.trigger import DefaultTrigger
 from apache_beam.transforms.trigger import GeneralTriggerDriver
@@ -217,7 +217,7 @@ class TriggerTest(unittest.TestCase):
   def test_fixed_after_first(self):
 self.run_trigger_simple(
 FixedWindows(10),  # pyformat break
-AfterFirst(AfterCount(2), AfterWatermark()),
+AfterAny(AfterCount(2), AfterWatermark()),
 AccumulationMode.ACCUMULATING,
 [(1, 'a'), (2, 'b'), (3, 'c')],
 {IntervalWindow(0, 10): [set('ab')]},
@@ -225,7 +225,7 @@ class TriggerTest(unittest.TestCase):
 2)
 self.run_trigger_simple(
 FixedWindows(10),  # pyformat break
-AfterFirst(AfterCount(5), AfterWatermark()),
+AfterAny(AfterCount(5), AfterWatermark()),
 AccumulationMode.ACCUMULATING,
 [(1, 'a'), (2, 'b'), (3, 'c')],
 {IntervalWindow(0, 10): [set('abc')]},
@@ -236,7 +236,7 @@ class TriggerTest(unittest.TestCase):
   def test_repeatedly_after_first(self):
 self.run_trigger_simple(
 FixedWindows(100),  # pyformat break
-Repeatedly(AfterFirst(AfterCount(3), AfterWatermark())),
+Repeatedly(AfterAny(AfterCount(3), AfterWatermark())),
 AccumulationMode.ACCUMULATING,
 zip(range(7), 'abcdefg'),
 {IntervalWindow(0, 100): [
@@ -388,7 +388,7 @@ class RunnerApiTest(unittest.TestCase):
 for trigger_fn in (
 DefaultTrigger(),
 AfterAll(AfterCount(1), AfterCount(10)),
-AfterFirst(AfterCount(10), AfterCount(100)),
+AfterAny(AfterCount(10), AfterCount(100)),
 

[1/2] beam git commit: [BEAM-662] Fix for allowing floating point periods in windows

2017-04-20 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 4e0c8333c -> 4e0d5f596


[BEAM-662] Fix for allowing floating point periods in windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc1bdd3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc1bdd3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc1bdd3

Branch: refs/heads/master
Commit: 1bc1bdd33494b4123855e2e3c9fa823654b31998
Parents: 4e0c833
Author: Sourabh Bajaj 
Authored: Wed Apr 19 18:20:11 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:53:55 2017 -0700

--
 sdks/python/apache_beam/transforms/window.py  | 10 ++
 sdks/python/apache_beam/transforms/window_test.py | 14 ++
 sdks/python/apache_beam/utils/timestamp.py|  4 
 3 files changed, 20 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/transforms/window.py
--
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index 319a7b4..931a17d 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -388,13 +388,15 @@ class SlidingWindows(NonMergingWindowFn):
   raise ValueError('The size parameter must be strictly positive.')
 self.size = Duration.of(size)
 self.period = Duration.of(period)
-self.offset = Timestamp.of(offset) % size
+self.offset = Timestamp.of(offset) % period
 
   def assign(self, context):
 timestamp = context.timestamp
-start = timestamp - (timestamp - self.offset) % self.period
-return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size)
-for s in range(start, start - self.size, -self.period)]
+start = timestamp - ((timestamp - self.offset) % self.period)
+return [
+IntervalWindow(Timestamp(micros=s), Timestamp(micros=s) + self.size)
+for s in range(start.micros, timestamp.micros - self.size.micros,
+   -self.period.micros)]
 
   def __eq__(self, other):
 if type(self) == type(other) == SlidingWindows:

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/transforms/window_test.py
--
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index 1ac95e4..cbfd0b2 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -108,6 +108,20 @@ class WindowTest(unittest.TestCase):
 self.assertEqual(expected, windowfn.assign(context('v', 8)))
 self.assertEqual(expected, windowfn.assign(context('v', 11)))
 
+  def test_sliding_windows_assignment_fraction(self):
+windowfn = SlidingWindows(size=3.5, period=2.5, offset=1.5)
+self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)],
+ windowfn.assign(context('v', 1.7)))
+self.assertEqual([IntervalWindow(1.5, 5.0)],
+ windowfn.assign(context('v', 3)))
+
+  def test_sliding_windows_assignment_fraction_large_offset(self):
+windowfn = SlidingWindows(size=3.5, period=2.5, offset=4.0)
+self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)],
+ windowfn.assign(context('v', 1.7)))
+self.assertEqual([IntervalWindow(4.0, 7.5), IntervalWindow(1.5, 5.0)],
+ windowfn.assign(context('v', 4.5)))
+
   def test_sessions_merging(self):
 windowfn = Sessions(10)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/utils/timestamp.py
--
diff --git a/sdks/python/apache_beam/utils/timestamp.py 
b/sdks/python/apache_beam/utils/timestamp.py
index 647f4bd..8b2ccda 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -167,10 +167,6 @@ class Duration(object):
 # Note that the returned value may have lost precision.
 return float(self.micros) / 100
 
-  def __int__(self):
-# Note that the returned value may have lost precision.
-return self.micros / 100
-
   def __cmp__(self, other):
 # Allow comparisons between Duration and Timestamp values.
 if not isinstance(other, Timestamp):



[2/2] beam git commit: Closes #2600

2017-04-20 Thread robertwb
Closes #2600


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e0d5f59
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e0d5f59
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e0d5f59

Branch: refs/heads/master
Commit: 4e0d5f59691721b3b3b9766a294c1d6af5589e43
Parents: 4e0c833 1bc1bdd
Author: Robert Bradshaw 
Authored: Thu Apr 20 08:53:56 2017 -0700
Committer: Robert Bradshaw 
Committed: Thu Apr 20 08:53:56 2017 -0700

--
 sdks/python/apache_beam/transforms/window.py  | 10 ++
 sdks/python/apache_beam/transforms/window_test.py | 14 ++
 sdks/python/apache_beam/utils/timestamp.py|  4 
 3 files changed, 20 insertions(+), 8 deletions(-)
--




[GitHub] beam pull request #2595: [Beam-115] More complete translation of the graph t...

2017-04-19 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2595

[Beam-115] More complete translation of the graph through the Runner API

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam py-runner-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2595.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2595


commit d5b5c5ca23fce24e3c7028836641737609274596
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-18T22:29:04Z

Per-transform runner api dispatch.

commit dcf2bda4cbad8d2f1bbaa9f650fe59cd40522b5e
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-18T22:51:50Z

Translate flatten to Runner API.

commit d8d96772a60d53c4ffdaa480fd0bee575c8ae4df
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-18T23:07:32Z

Translate WindowInto through the Runner API.

commit 41a7bb060ecc46182252469fe2c055562a72a6a0
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-19T00:01:53Z

Translate Reads through the Runner API.

commit 05da3fd47701434f004a9d7ad8c5b80261d3c11d
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-19T05:46:25Z

Factor out common URN registration.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2586: Remove vestigial Read and Write from core.py

2017-04-18 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2586

Remove vestigial Read and Write from core.py

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam patch-7

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2586.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2586


commit e63aa0a104b89f0d0e24859227611e8f3235d559
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-18T23:23:51Z

Remove vestigial Read and Write from core.py




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2552: Various cleanups

2017-04-17 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2552

Various cleanups

I accumulated these while working on other code. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam cleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2552


commit 2fb4ce5798e8ad8f6255101a232dda5894dd9489
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-11T17:29:15Z

Make stage names consistent.

commit c56afa5854b62f081f1a132f7b7acd30ae8ff3b3
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-11T17:49:13Z

Require deterministic window coders.

commit 0fb2cabdb71eb5cf7d3ee42995906658eaf8e5d0
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-11T17:52:44Z

Enable IntervalWindowCoder test check.

commit 5a70fb678828db3687ce7ec0b1eedee5a2bce8b0
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-11T17:57:35Z

Remove obsolete and unused Runner.clear

commit 5629453d243c39ead63da5b9b75c122db660474b
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-11T18:00:04Z

Rename AfterFirst to AfterAny for consistency with Java.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: Ignore more python build artifacts.

2017-04-11 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 77712c936 -> a5a5bf946


Ignore more python build artifacts.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a1683578
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a1683578
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a1683578

Branch: refs/heads/master
Commit: a168357885b17bde688a548ed1fa12231a49658d
Parents: 77712c9
Author: Robert Bradshaw 
Authored: Tue Apr 11 11:49:16 2017 -0700
Committer: Robert Bradshaw 
Committed: Tue Apr 11 11:49:16 2017 -0700

--
 .gitignore | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/a1683578/.gitignore
--
diff --git a/.gitignore b/.gitignore
index bc9f675..69946a9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,6 +19,9 @@ build/
 dist/
 distribute-*
 env/
+sdks/python/**/*.c
+sdks/python/**/*.so
+sdks/python/**/*.egg
 
 # Ignore IntelliJ files.
 .idea/



[2/2] beam git commit: Closes #2494

2017-04-11 Thread robertwb
Closes #2494


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5a5bf94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5a5bf94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5a5bf94

Branch: refs/heads/master
Commit: a5a5bf946f69d75897279f66aeeb0dcea3babfb2
Parents: 77712c9 a168357
Author: Robert Bradshaw 
Authored: Tue Apr 11 12:12:52 2017 -0700
Committer: Robert Bradshaw 
Committed: Tue Apr 11 12:12:52 2017 -0700

--
 .gitignore | 3 +++
 1 file changed, 3 insertions(+)
--




[GitHub] beam pull request #2494: Ignore more python build artifacts.

2017-04-11 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2494

Ignore more python build artifacts.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam git-ignore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2494.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2494


commit a168357885b17bde688a548ed1fa12231a49658d
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-11T18:49:16Z

Ignore more python build artifacts.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2437: Use absolute import for dataflow iobase test.

2017-04-05 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2437

Use absolute import for dataflow iobase test.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam patch-6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2437.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2437


commit 1ae5fee58bc494bc072cde5d6e6cc7f7cda69e4b
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-05T18:55:20Z

Use absolute import for dataflow iobase test.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: Closes #2435

2017-04-05 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master bc907c58b -> e32a0252f


Closes #2435


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e32a0252
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e32a0252
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e32a0252

Branch: refs/heads/master
Commit: e32a0252fca4ac7011b555d1aa14b1e098ac52a5
Parents: bc907c5 3eef246
Author: Robert Bradshaw 
Authored: Wed Apr 5 10:46:29 2017 -0700
Committer: Robert Bradshaw 
Committed: Wed Apr 5 10:46:29 2017 -0700

--
 sdks/python/apache_beam/examples/wordcount.py   |  32 ++--
 .../apache_beam/internal/gcp/json_value.py  |   6 -
 sdks/python/apache_beam/io/filebasedsource.py   |  54 ++
 .../apache_beam/io/filebasedsource_test.py  |  24 ---
 sdks/python/apache_beam/io/fileio.py|  56 ++-
 sdks/python/apache_beam/io/fileio_test.py   |  45 ++---
 .../runners/dataflow/internal/apiclient.py  |   1 -
 .../apache_beam/runners/direct/direct_runner.py |   9 -
 sdks/python/apache_beam/transforms/display.py   |   1 -
 .../apache_beam/transforms/display_test.py  |  36 
 .../apache_beam/utils/pipeline_options.py   |  92 +--
 .../apache_beam/utils/pipeline_options_test.py  |  52 +-
 sdks/python/apache_beam/utils/value_provider.py | 110 -
 .../apache_beam/utils/value_provider_test.py| 165 ---
 14 files changed, 65 insertions(+), 618 deletions(-)
--




[2/2] beam git commit: Revert "Add ValueProvider class for FileBasedSource I/O Transforms"

2017-04-05 Thread robertwb
Revert "Add ValueProvider class for FileBasedSource I/O Transforms"

This reverts commit 1e2168a127fb3047fb15d231a001bbf951892e11.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3eef246f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3eef246f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3eef246f

Branch: refs/heads/master
Commit: 3eef246f761f92c626541e9008f8624b43bdcc09
Parents: bc907c5
Author: Ahmet Altay 
Authored: Wed Apr 5 10:03:14 2017 -0700
Committer: Robert Bradshaw 
Committed: Wed Apr 5 10:46:29 2017 -0700

--
 sdks/python/apache_beam/examples/wordcount.py   |  32 ++--
 .../apache_beam/internal/gcp/json_value.py  |   6 -
 sdks/python/apache_beam/io/filebasedsource.py   |  54 ++
 .../apache_beam/io/filebasedsource_test.py  |  24 ---
 sdks/python/apache_beam/io/fileio.py|  56 ++-
 sdks/python/apache_beam/io/fileio_test.py   |  45 ++---
 .../runners/dataflow/internal/apiclient.py  |   1 -
 .../apache_beam/runners/direct/direct_runner.py |   9 -
 sdks/python/apache_beam/transforms/display.py   |   1 -
 .../apache_beam/transforms/display_test.py  |  36 
 .../apache_beam/utils/pipeline_options.py   |  92 +--
 .../apache_beam/utils/pipeline_options_test.py  |  52 +-
 sdks/python/apache_beam/utils/value_provider.py | 110 -
 .../apache_beam/utils/value_provider_test.py| 165 ---
 14 files changed, 65 insertions(+), 618 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/3eef246f/sdks/python/apache_beam/examples/wordcount.py
--
diff --git a/sdks/python/apache_beam/examples/wordcount.py 
b/sdks/python/apache_beam/examples/wordcount.py
index 27b9dcb..50c0328 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -19,6 +19,7 @@
 
 from __future__ import absolute_import
 
+import argparse
 import logging
 import re
 
@@ -66,29 +67,24 @@ class WordExtractingDoFn(beam.DoFn):
 
 def run(argv=None):
   """Main entry point; defines and runs the wordcount pipeline."""
-  class WordcountOptions(PipelineOptions):
-@classmethod
-def _add_argparse_args(cls, parser):
-  parser.add_value_provider_argument(
-  '--input',
-  dest='input',
-  default='gs://dataflow-samples/shakespeare/kinglear.txt',
-  help='Input file to process.')
-  parser.add_value_provider_argument(
-  '--output',
-  dest='output',
-  required=True,
-  help='Output file to write results to.')
-  pipeline_options = PipelineOptions(argv)
-  wordcount_options = pipeline_options.view_as(WordcountOptions)
-
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+  dest='input',
+  default='gs://dataflow-samples/shakespeare/kinglear.txt',
+  help='Input file to process.')
+  parser.add_argument('--output',
+  dest='output',
+  required=True,
+  help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
   # We use the save_main_session option because one or more DoFn's in this
   # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
   p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | 'read' >> ReadFromText(wordcount_options.input)
+  lines = p | 'read' >> ReadFromText(known_args.input)
 
   # Count the occurrences of each word.
   counts = (lines
@@ -103,7 +99,7 @@ def run(argv=None):
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | 'write' >> WriteToText(wordcount_options.output)
+  output | 'write' >> WriteToText(known_args.output)
 
   # Actually run the pipeline (all operations above are deferred).
   result = p.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/3eef246f/sdks/python/apache_beam/internal/gcp/json_value.py
--
diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py 
b/sdks/python/apache_beam/internal/gcp/json_value.py
index 4099c1a..c8b5393 100644
--- a/sdks/python/apache_beam/internal/gcp/json_value.py
+++ b/sdks/python/apache_beam/internal/gcp/json_value.py
@@ -25,8 +25,6 @@ except ImportError:
   extra_types = None
 # pylint: enable=wrong-import-order, wrong-import-position
 
-from apache_beam.utils.value_provider import 

[1/2] beam git commit: Closes #2432

2017-04-05 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master e2a2836ad -> 8e5cfdea9


Closes #2432


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8e5cfdea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8e5cfdea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8e5cfdea

Branch: refs/heads/master
Commit: 8e5cfdea9d331b2bfac0dd054bafc7f3f5295b48
Parents: e2a2836 5436081
Author: Robert Bradshaw 
Authored: Wed Apr 5 09:54:48 2017 -0700
Committer: Robert Bradshaw 
Committed: Wed Apr 5 09:54:48 2017 -0700

--
 sdks/python/apache_beam/pipeline.py   | 1 +
 sdks/python/apache_beam/transforms/ptransform_test.py | 6 ++
 2 files changed, 7 insertions(+)
--




[2/2] beam git commit: Update refcounts after pipeline reconstruction.

2017-04-05 Thread robertwb
Update refcounts after pipeline reconstruction.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54360814
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54360814
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54360814

Branch: refs/heads/master
Commit: 54360814fbc59f69e10f4bac52cfeeea5e044cf9
Parents: e2a2836
Author: Robert Bradshaw 
Authored: Wed Apr 5 08:37:33 2017 -0700
Committer: Robert Bradshaw 
Committed: Wed Apr 5 09:54:48 2017 -0700

--
 sdks/python/apache_beam/pipeline.py   | 1 +
 sdks/python/apache_beam/transforms/ptransform_test.py | 6 ++
 2 files changed, 7 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/54360814/sdks/python/apache_beam/pipeline.py
--
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 8506b85..fdb9a9d 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -546,4 +546,5 @@ class AppliedPTransform(object):
 if pc not in result.inputs:
   pc.producer = result
   pc.tag = tag
+result.update_input_refcounts()
 return result

http://git-wip-us.apache.org/repos/asf/beam/blob/54360814/sdks/python/apache_beam/transforms/ptransform_test.py
--
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 37ff2a8..5889ab5 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -435,6 +435,12 @@ class PTransformTest(unittest.TestCase):
 assert_that(result, equal_to([]))
 pipeline.run()
 
+  def test_flatten_same_pcollections(self):
+pipeline = TestPipeline()
+pc = pipeline | beam.Create(['a', 'b'])
+assert_that((pc, pc, pc) | beam.Flatten(), equal_to(['a', 'b'] * 3))
+pipeline.run()
+
   def test_flatten_pcollections_in_iterable(self):
 pipeline = TestPipeline()
 pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3])



[GitHub] beam pull request #2432: Update refcounts after pipeline reconstruction.

2017-04-05 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2432

Update refcounts after pipeline reconstruction.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam flatten

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2432.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2432


commit 6a337db9264b9179d0c77aff4d94e89267be838d
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-05T15:37:33Z

Update refcounts after pipeline reconstruction.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: Closes #2400

2017-04-04 Thread robertwb
Closes #2400


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50fc63a9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50fc63a9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50fc63a9

Branch: refs/heads/master
Commit: 50fc63a9b52c9c6f20082ed68ee3dc47966e0d32
Parents: 22ff898 a41
Author: Robert Bradshaw 
Authored: Tue Apr 4 11:26:54 2017 -0700
Committer: Robert Bradshaw 
Committed: Tue Apr 4 11:26:54 2017 -0700

--
 sdks/python/apache_beam/pipeline.py | 5 +
 1 file changed, 5 insertions(+)
--




[1/2] beam git commit: Avoid Runner API translation of pipelines with PDone.

2017-04-04 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 22ff898a3 -> 50fc63a9b


Avoid Runner API translation of pipelines with PDone.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a419
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a419
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a419

Branch: refs/heads/master
Commit: a419a8e58cafced10232ff56094a58fe17ce
Parents: 22ff898
Author: Robert Bradshaw 
Authored: Sat Apr 1 12:42:40 2017 -0700
Committer: Robert Bradshaw 
Committed: Tue Apr 4 11:26:53 2017 -0700

--
 sdks/python/apache_beam/pipeline.py | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/a419/sdks/python/apache_beam/pipeline.py
--
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 3c416eb..8506b85 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -318,6 +318,11 @@ class Pipeline(object):
   pickler.loads(pickler.dumps(transform_node.transform))
 except Exception:
   Visitor.ok = False
+
+  def visit_value(self, value, _):
+if isinstance(value, pvalue.PDone):
+  Visitor.ok = False
+
 self.visit(Visitor())
 return Visitor.ok
 



[GitHub] beam pull request #2400: [BEAM-1843] Avoid Runner API translation of pipelin...

2017-04-01 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2400

[BEAM-1843] Avoid Runner API translation of pipelines with PDone.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam pdone

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2400.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2400


commit c4ac34dc40387b752c73a7540c09cda2ceff6833
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-04-01T19:42:40Z

Avoid Runner API translation of pipelines with PDone.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: Fix side inputs on dataflow runner.

2017-03-31 Thread robertwb
Fix side inputs on dataflow runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/07daf3a5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/07daf3a5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/07daf3a5

Branch: refs/heads/master
Commit: 07daf3a54544ce842165ffe15264e43ebced28ba
Parents: 60901f8
Author: Robert Bradshaw 
Authored: Fri Mar 31 14:57:44 2017 -0700
Committer: Robert Bradshaw 
Committed: Fri Mar 31 21:45:16 2017 -0700

--
 .../apache_beam/runners/dataflow/dataflow_runner.py| 13 +++--
 1 file changed, 7 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/07daf3a5/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
--
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index db433df..fe9f8c0 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -287,7 +287,7 @@ class DataflowRunner(PipelineRunner):
   def _add_singleton_step(self, label, full_label, tag, input_step):
 """Creates a CollectionToSingleton step used to handle ParDo side 
inputs."""
 # Import here to avoid adding the dependency for local running scenarios.
-from google.cloud.dataflow.internal import apiclient
+from apache_beam.runners.dataflow.internal import apiclient
 step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label)
 self.job.proto.steps.append(step.proto)
 step.add_property(PropertyNames.USER_NAME, full_label)
@@ -302,7 +302,7 @@ class DataflowRunner(PipelineRunner):
 [{PropertyNames.USER_NAME: (
 '%s.%s' % (full_label, PropertyNames.OUTPUT)),
   PropertyNames.ENCODING: step.encoding,
-  PropertyNames.OUTPUT_NAME: PropertyNames.OUTPUT}])
+  PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 return step
 
   def run_Flatten(self, transform_node):
@@ -374,12 +374,10 @@ class DataflowRunner(PipelineRunner):
 si_dict = {}
 # We must call self._cache.get_pvalue exactly once due to refcounting.
 si_labels = {}
-for side_pval in transform_node.side_inputs:
-  si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name
 lookup_label = lambda side_pval: si_labels[side_pval]
 for side_pval in transform_node.side_inputs:
   assert isinstance(side_pval, AsSideInput)
-  si_label = self._get_unique_step_name()
+  si_label = 'SideInput-' + self._get_unique_step_name()
   si_full_label = '%s/%s' % (transform_node.full_label, si_label)
   self._add_singleton_step(
   si_label, si_full_label, side_pval.pvalue.tag,
@@ -388,10 +386,13 @@ class DataflowRunner(PipelineRunner):
   '@type': 'OutputReference',
   PropertyNames.STEP_NAME: si_label,
   PropertyNames.OUTPUT_NAME: PropertyNames.OUT}
+  si_labels[side_pval] = si_label
 
 # Now create the step for the ParDo transform being handled.
 step = self._add_step(
-TransformNames.DO, transform_node.full_label, transform_node,
+TransformNames.DO,
+transform_node.full_label + '/Do' if transform_node.side_inputs else 
'',
+transform_node,
 transform_node.transform.side_output_tags)
 fn_data = self._pardo_fn_data(transform_node, lookup_label)
 step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data))



[1/2] beam git commit: Closes #2395

2017-03-31 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 60901f876 -> 03dce6dcc


Closes #2395


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03dce6dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03dce6dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03dce6dc

Branch: refs/heads/master
Commit: 03dce6dccef50210dd420cb9f79baf063bc0b0e8
Parents: 60901f8 07daf3a
Author: Robert Bradshaw 
Authored: Fri Mar 31 21:45:16 2017 -0700
Committer: Robert Bradshaw 
Committed: Fri Mar 31 21:45:16 2017 -0700

--
 .../apache_beam/runners/dataflow/dataflow_runner.py| 13 +++--
 1 file changed, 7 insertions(+), 6 deletions(-)
--




[2/3] beam git commit: Only encode PCollection outputs in Runner API protos.

2017-03-31 Thread robertwb
Only encode PCollection outputs in Runner API protos.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9ff44af
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9ff44af
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9ff44af

Branch: refs/heads/master
Commit: c9ff44afa3fcb47b7f0c4288f4f7d520f063d442
Parents: 0749982
Author: Robert Bradshaw 
Authored: Fri Mar 31 16:57:01 2017 -0700
Committer: Robert Bradshaw 
Committed: Fri Mar 31 21:44:21 2017 -0700

--
 sdks/python/apache_beam/pipeline.py | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/c9ff44af/sdks/python/apache_beam/pipeline.py
--
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index ee5904b..0841e5f 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -494,6 +494,10 @@ class AppliedPTransform(object):
 return {str(ix): input for ix, input in enumerate(self.inputs)
 if isinstance(input, pvalue.PCollection)}
 
+  def named_outputs(self):
+return {str(tag): output for tag, output in self.outputs.items()
+if isinstance(output, pvalue.PCollection)}
+
   def to_runner_api(self, context):
 from apache_beam.runners.api import beam_runner_api_pb2
 return beam_runner_api_pb2.PTransform(
@@ -507,7 +511,7 @@ class AppliedPTransform(object):
 inputs={tag: context.pcollections.get_id(pc)
 for tag, pc in self.named_inputs().items()},
 outputs={str(tag): context.pcollections.get_id(out)
- for tag, out in self.outputs.items()},
+ for tag, out in self.named_outputs().items()},
 # TODO(BEAM-115): display_data
 display_data=None)
 



[3/3] beam git commit: Closes #2396

2017-03-31 Thread robertwb
Closes #2396


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/60901f87
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/60901f87
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/60901f87

Branch: refs/heads/master
Commit: 60901f876aaf9e85c11501557516f9d47b87d994
Parents: 0749982 bd40c5a
Author: Robert Bradshaw 
Authored: Fri Mar 31 21:44:22 2017 -0700
Committer: Robert Bradshaw 
Committed: Fri Mar 31 21:44:22 2017 -0700

--
 sdks/python/apache_beam/pipeline.py | 12 +++-
 1 file changed, 11 insertions(+), 1 deletion(-)
--




[1/3] beam git commit: Ensure transforms are picklable before materializing to protos.

2017-03-31 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 07499824b -> 60901f876


Ensure transforms are picklable before materializing to protos.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd40c5a2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd40c5a2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd40c5a2

Branch: refs/heads/master
Commit: bd40c5a27e379914914a30f79854fd7a38621c66
Parents: c9ff44a
Author: Robert Bradshaw 
Authored: Fri Mar 31 16:57:22 2017 -0700
Committer: Robert Bradshaw 
Committed: Fri Mar 31 21:44:21 2017 -0700

--
 sdks/python/apache_beam/pipeline.py | 6 ++
 1 file changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/bd40c5a2/sdks/python/apache_beam/pipeline.py
--
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 0841e5f..3c416eb 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -311,6 +311,12 @@ class Pipeline(object):
 
   def visit_transform(self, transform_node):
 if transform_node.side_inputs:
+  # No side inputs (yet).
+  Visitor.ok = False
+try:
+  # Transforms must be picklable.
+  pickler.loads(pickler.dumps(transform_node.transform))
+except Exception:
   Visitor.ok = False
 self.visit(Visitor())
 return Visitor.ok



[GitHub] beam pull request #2396: [BEAM-1843] Ensure transforms are picklable before ...

2017-03-31 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2396

[BEAM-1843] Ensure transforms are picklable before materializing to protos.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam native-write

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2396.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2396


commit c6abd8ea8260ff51614d9b309975975adc4cf32e
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-31T23:30:56Z

Ensure transforms are picklable before materializing to protos.

See BEAM-1843.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2395: [BEAM-1853] Fix side inputs on dataflow runner.

2017-03-31 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2395

[BEAM-1853] Fix side inputs on dataflow runner.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam side-inputs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2395.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2395


commit 3aa157a9be4f5bf8e390fcb3acf37d8bdb250954
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-31T21:57:44Z

Fix side inputs on dataflow runner.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: Closes #2388

2017-03-31 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 132d3c5f6 -> affb926cc


Closes #2388


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/affb926c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/affb926c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/affb926c

Branch: refs/heads/master
Commit: affb926ccd9302734805d8c0db418d006ee37672
Parents: 132d3c5 207de81
Author: Robert Bradshaw 
Authored: Fri Mar 31 12:11:00 2017 -0700
Committer: Robert Bradshaw 
Committed: Fri Mar 31 12:11:00 2017 -0700

--
 sdks/python/apache_beam/pipeline.py |  12 +-
 sdks/python/apache_beam/pvalue.py   | 258 +++
 sdks/python/apache_beam/pvalue_test.py  |  33 ---
 .../runners/dataflow/dataflow_runner.py |  29 ++-
 .../runners/direct/bundle_factory.py|   3 +-
 .../consumer_tracking_pipeline_visitor.py   |  11 +-
 .../consumer_tracking_pipeline_visitor_test.py  |   4 +-
 .../runners/direct/evaluation_context.py|  60 +++--
 .../apache_beam/runners/direct/executor.py  |   7 +-
 .../runners/direct/transform_evaluator.py   |  51 +---
 sdks/python/apache_beam/transforms/core.py  |   2 +-
 .../python/apache_beam/transforms/ptransform.py |   4 +-
 .../python/apache_beam/transforms/sideinputs.py | 132 --
 .../apache_beam/transforms/sideinputs_test.py   |  91 +++
 14 files changed, 198 insertions(+), 499 deletions(-)
--




[2/2] beam git commit: Change side inputs to be references rather than full PValues.

2017-03-31 Thread robertwb
Change side inputs to be references rather than full PValues.

This is more consistent with the Runner API's structure.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/207de81b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/207de81b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/207de81b

Branch: refs/heads/master
Commit: 207de81bca4c3761cf663d32f9b95a022ef97165
Parents: 132d3c5
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Thu Mar 30 08:20:21 2017 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Fri Mar 31 12:11:00 2017 -0700

--
 sdks/python/apache_beam/pipeline.py |  12 +-
 sdks/python/apache_beam/pvalue.py   | 258 +++
 sdks/python/apache_beam/pvalue_test.py  |  33 ---
 .../runners/dataflow/dataflow_runner.py |  29 ++-
 .../runners/direct/bundle_factory.py|   3 +-
 .../consumer_tracking_pipeline_visitor.py   |  11 +-
 .../consumer_tracking_pipeline_visitor_test.py  |   4 +-
 .../runners/direct/evaluation_context.py|  60 +++--
 .../apache_beam/runners/direct/executor.py  |   7 +-
 .../runners/direct/transform_evaluator.py   |  51 +---
 sdks/python/apache_beam/transforms/core.py  |   2 +-
 .../python/apache_beam/transforms/ptransform.py |   4 +-
 .../python/apache_beam/transforms/sideinputs.py | 132 --
 .../apache_beam/transforms/sideinputs_test.py   |  91 +++
 14 files changed, 198 insertions(+), 499 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pipeline.py
--
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index be2a79d..ee5904b 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -142,9 +142,6 @@ class Pipeline(object):
 # If a transform is applied and the full label is already in the set
 # then the transform will have to be cloned with a new label.
 self.applied_labels = set()
-# Store cache of views created from PCollections.  For reference, see
-# pvalue._cache_view().
-self._view_cache = {}
 
   def _current_transform(self):
 """Returns the transform currently on the top of the stack."""
@@ -271,8 +268,8 @@ class Pipeline(object):
     result.producer = current
   # TODO(robertwb): Multi-input, multi-output inference.
   # TODO(robertwb): Ideally we'd do intersection here.
-  if (type_options is not None and type_options.pipeline_type_check and
-  isinstance(result, (pvalue.PCollection, pvalue.PCollectionView))
+  if (type_options is not None and type_options.pipeline_type_check
+  and isinstance(result, pvalue.PCollection)
   and not result.element_type):
 input_element_type = (
 inputs[0].element_type
@@ -416,7 +413,7 @@ class AppliedPTransform(object):
 if not isinstance(main_input, pvalue.PBegin):
   real_producer(main_input).refcounts[main_input.tag] += 1
   for side_input in self.side_inputs:
-real_producer(side_input).refcounts[side_input.tag] += 1
+real_producer(side_input.pvalue).refcounts[side_input.pvalue.tag] += 1
 
   def add_output(self, output, tag=None):
 if isinstance(output, pvalue.DoOutputsTuple):
@@ -456,7 +453,8 @@ class AppliedPTransform(object):
 
 # Visit side inputs.
 for pval in self.side_inputs:
-  if isinstance(pval, pvalue.PCollectionView) and pval not in visited:
+  if isinstance(pval, pvalue.AsSideInput) and pval.pvalue not in visited:
+pval = pval.pvalue  # Unpack marker-object-wrapped pvalue.
 assert pval.producer is not None
 pval.producer.visit(visitor, pipeline, visited)
 # The value should be visited now since we visit outputs too.

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pvalue.py
--
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index 4114b3f..bfe1745 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -26,9 +26,10 @@ produced when the pipeline gets executed.
 
 from __future__ import absolute_import
 
-import collections
 import itertools
 
+from apache_beam import typehints
+
 
 class PValue(object):
   """Base class for PCollection.
@@ -250,20 +251,22 @@ class SideOutputValue(object):
 self.value = value
 
 
-class PCollectionView(PValue):
-  """An immutable view of a PCollection that can be used as a side input."""
+class AsSideInput(object):
+  ""&qu

[GitHub] beam pull request #2388: Change side inputs to be references rather than ful...

2017-03-30 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2388

Change side inputs to be references rather than full PValues.

This is more consistent with the Runner API's structure.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam side-inputs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2388.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2388


commit 85c60920325b8c06f0120fa8be2a2020d4cbdff4
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-30T15:20:21Z

Change side inputs to be references rather than full PValues.

This is more consistent with the Runner API's structure.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: Translate pipeline graph to and from Runner API protos.

2017-03-30 Thread robertwb
lf.options.view_as(SetupOptions).save_main_session:
   # If this option is chosen, verify we can pickle the main session early.
   tmpdir = tempfile.mkdtemp()
@@ -299,6 +308,42 @@ class Pipeline(object):
 self.transforms_stack.pop()
 return pvalueish_result
 
+  def _verify_runner_api_compatible(self):
+class Visitor(PipelineVisitor):  # pylint: disable=used-before-assignment
+  ok = True  # Really a nonlocal.
+
+  def visit_transform(self, transform_node):
+if transform_node.side_inputs:
+  Visitor.ok = False
+self.visit(Visitor())
+return Visitor.ok
+
+  def to_runner_api(self):
+from apache_beam.runners import pipeline_context
+from apache_beam.runners.api import beam_runner_api_pb2
+context = pipeline_context.PipelineContext()
+# Mutates context; placing inline would force dependence on
+# argument evaluation order.
+root_transform_id = context.transforms.get_id(self._root_transform())
+proto = beam_runner_api_pb2.Pipeline(
+root_transform_id=root_transform_id,
+components=context.to_runner_api())
+return proto
+
+  @staticmethod
+  def from_runner_api(proto, runner, options):
+p = Pipeline(runner=runner, options=options)
+from apache_beam.runners import pipeline_context
+context = pipeline_context.PipelineContext(proto.components)
+p.transforms_stack = [
+context.transforms.get_by_id(proto.root_transform_id)]
+# TODO(robertwb): These are only needed to continue construction. Omit?
+p.applied_labels = set([
+t.unique_name for t in proto.components.transforms.values()])
+for id in proto.components.pcollections:
+  context.pcollections.get_by_id(id).pipeline = p
+return p
+
 
 class PipelineVisitor(object):
   """Visitor pattern class used to traverse a DAG of transforms.
@@ -374,12 +419,16 @@ class AppliedPTransform(object):
 real_producer(side_input).refcounts[side_input.tag] += 1
 
   def add_output(self, output, tag=None):
-assert (isinstance(output, pvalue.PValue) or
-isinstance(output, pvalue.DoOutputsTuple))
-if tag is None:
-  tag = len(self.outputs)
-assert tag not in self.outputs
-self.outputs[tag] = output
+if isinstance(output, pvalue.DoOutputsTuple):
+  self.add_output(output[output._main_tag])
+elif isinstance(output, pvalue.PValue):
+  # TODO(BEAM-1833): Require tags when calling this method.
+  if tag is None and None in self.outputs:
+tag = len(self.outputs)
+  assert tag not in self.outputs
+  self.outputs[tag] = output
+else:
+  raise TypeError("Unexpected output type: %s" % output)
 
   def add_part(self, part):
 assert isinstance(part, AppliedPTransform)
@@ -441,3 +490,47 @@ class AppliedPTransform(object):
 if v not in visited:
   visited.add(v)
   visitor.visit_value(v, self)
+
+  def named_inputs(self):
+# TODO(BEAM-1833): Push names up into the sdk construction.
+return {str(ix): input for ix, input in enumerate(self.inputs)
+if isinstance(input, pvalue.PCollection)}
+
+  def to_runner_api(self, context):
+from apache_beam.runners.api import beam_runner_api_pb2
+return beam_runner_api_pb2.PTransform(
+unique_name=self.full_label,
+spec=beam_runner_api_pb2.UrnWithParameter(
+urn=urns.PICKLED_TRANSFORM,
+parameter=proto_utils.pack_Any(
+wrappers_pb2.BytesValue(value=pickler.dumps(self.transform,
+subtransforms=[context.transforms.get_id(part) for part in self.parts],
+# TODO(BEAM-115): Side inputs.
+inputs={tag: context.pcollections.get_id(pc)
+for tag, pc in self.named_inputs().items()},
+outputs={str(tag): context.pcollections.get_id(out)
+ for tag, out in self.outputs.items()},
+# TODO(BEAM-115): display_data
+display_data=None)
+
+  @staticmethod
+  def from_runner_api(proto, context):
+result = AppliedPTransform(
+parent=None,
+transform=pickler.loads(
+proto_utils.unpack_Any(proto.spec.parameter,
+   wrappers_pb2.BytesValue).value),
+full_label=proto.unique_name,
+inputs=[
+context.pcollections.get_by_id(id) for id in 
proto.inputs.values()])
+result.parts = [
+context.transforms.get_by_id(id) for id in proto.subtransforms]
+result.outputs = {
+None if tag == 'None' else tag: context.pcollections.get_by_id(id)
+for tag, id in proto.outputs.items()}
+if not result.parts:
+  for tag, pc in result.outputs.items():
+if pc not in result.inputs:
+  pc.producer = result
+  pc.tag = tag
+return result

http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pipeline_test.py
--
diff --git 

[GitHub] beam pull request #2318: Add link to contributions guides to github README

2017-03-24 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/beam/pull/2318


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: Allow customizing the GCE metadata service URL.

2017-03-24 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 9b8e16fca -> c80ef1837


Allow customizing the GCE metadata service URL.

The goal here is to allow a user to customize where a job finds the metadata
service; it would also be possible to do this programmatically (eg expose a
variable), but making it an environment variable allows a caller to do this
without need to do so in-process.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7453766b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7453766b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7453766b

Branch: refs/heads/master
Commit: 7453766b0045fabcb8e6e683b30f7fa1a20b334c
Parents: 9b8e16f
Author: Craig Citro 
Authored: Fri Mar 24 13:38:50 2017 -0700
Committer: Robert Bradshaw 
Committed: Fri Mar 24 14:47:19 2017 -0700

--
 sdks/python/apache_beam/internal/gcp/auth.py | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/7453766b/sdks/python/apache_beam/internal/gcp/auth.py
--
diff --git a/sdks/python/apache_beam/internal/gcp/auth.py 
b/sdks/python/apache_beam/internal/gcp/auth.py
index ccc67c6..8304658 100644
--- a/sdks/python/apache_beam/internal/gcp/auth.py
+++ b/sdks/python/apache_beam/internal/gcp/auth.py
@@ -86,9 +86,11 @@ class GCEMetadataCredentials(OAuth2Credentials):
   retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def _refresh(self, http_request):
 refresh_time = datetime.datetime.now()
-req = urllib2.Request('http://metadata.google.internal/computeMetadata/v1/'
-  'instance/service-accounts/default/token',
-  headers={'Metadata-Flavor': 'Google'})
+metadata_root = os.environ.get(
+'GCE_METADATA_ROOT', 'metadata.google.internal')
+token_url = ('http://{}/computeMetadata/v1/instance/service-accounts/'
+ 'default/token').format(metadata_root)
+req = urllib2.Request(token_url, headers={'Metadata-Flavor': 'Google'})
 token_data = json.loads(urllib2.urlopen(req).read())
 self.access_token = token_data['access_token']
 self.token_expiry = (refresh_time +



[2/2] beam git commit: Closes #2319

2017-03-24 Thread robertwb
Closes #2319


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c80ef183
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c80ef183
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c80ef183

Branch: refs/heads/master
Commit: c80ef18376cb8c5b2cf8c1d9ac6000386d1eceb2
Parents: 9b8e16f 7453766
Author: Robert Bradshaw 
Authored: Fri Mar 24 14:47:20 2017 -0700
Committer: Robert Bradshaw 
Committed: Fri Mar 24 14:47:20 2017 -0700

--
 sdks/python/apache_beam/internal/gcp/auth.py | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)
--




[GitHub] beam pull request #2318: Add link to contributions guides to github README

2017-03-24 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2318

Add link to contributions guides to github README

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam patch-5

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2318.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2318


commit 950c0a36bc7e8046e67ee22089af902e40f24f4a
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-24T20:36:36Z

Add link to contributions guides to github README




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2296: [BEAM-115] Translate pipeline graph to and from Run...

2017-03-22 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2296

[BEAM-115] Translate pipeline graph to and from Runner API protos.

There are some caveates:
  * Specific known transforms, with their payloads, are not yet translated.
  * Side inputs are not yet supported.

All pipelines without side inputs are passed through this translation by 
default
before execution.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam py-runner-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2296.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2296


commit ebc74e9678050c632574b11afebf250b8332c25e
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-20T23:08:37Z

Translate pipeline graph to and from Runner API protos.

There are some caveates:
  * Specific known transforms, with their payloads, are not yet translated.
  * Side inputs are not yet supported.

All pipelines without side inputs are passed through this translation by 
default
before execution.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam-site pull request #176: Update quickstart-java.md

2017-03-21 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/beam-site/pull/176


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/3] beam git commit: Fix bad test obscured by falsely passing assert_that.

2017-03-21 Thread robertwb
Fix bad test obscured by falsely passing assert_that.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47da10cf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47da10cf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47da10cf

Branch: refs/heads/master
Commit: 47da10cfc21e3344bd198e572528a02fcf1caec3
Parents: c39b02d
Author: Robert Bradshaw 
Authored: Tue Mar 21 00:41:06 2017 -0700
Committer: Robert Bradshaw 
Committed: Tue Mar 21 10:38:37 2017 -0700

--
 sdks/python/apache_beam/transforms/window_test.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/47da10cf/sdks/python/apache_beam/transforms/window_test.py
--
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index 99be02c..11c8a68 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -142,7 +142,7 @@ class WindowTest(unittest.TestCase):
 
   def timestamped_key_values(self, pipeline, key, *timestamps):
 return (pipeline | 'start' >> Create(timestamps)
-| Map(lambda x: WindowedValue((key, x), x, [])))
+| Map(lambda x: WindowedValue((key, x), x, [GlobalWindow()])))
 
   def test_sliding_windows(self):
 p = TestPipeline()



[1/3] beam git commit: Closes #2279

2017-03-21 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 030528f3d -> bea4f5aec


Closes #2279


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bea4f5ae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bea4f5ae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bea4f5ae

Branch: refs/heads/master
Commit: bea4f5aecce80c398e6a73f62263b454642826f1
Parents: 030528f 47da10c
Author: Robert Bradshaw 
Authored: Tue Mar 21 10:38:37 2017 -0700
Committer: Robert Bradshaw 
Committed: Tue Mar 21 10:38:37 2017 -0700

--
 sdks/python/apache_beam/transforms/util.py  | 18 ---
 sdks/python/apache_beam/transforms/util_test.py | 50 
 .../apache_beam/transforms/window_test.py   |  2 +-
 3 files changed, 63 insertions(+), 7 deletions(-)
--




[3/3] beam git commit: [BEAM-1768] Fix assert_that for empty inputs

2017-03-21 Thread robertwb
[BEAM-1768] Fix assert_that for empty inputs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c39b02d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c39b02d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c39b02d9

Branch: refs/heads/master
Commit: c39b02d9ea2d343f348beca1228776204605fb93
Parents: 030528f
Author: Robert Bradshaw 
Authored: Mon Mar 20 18:20:01 2017 -0700
Committer: Robert Bradshaw 
Committed: Tue Mar 21 10:38:37 2017 -0700

--
 sdks/python/apache_beam/transforms/util.py  | 18 ---
 sdks/python/apache_beam/transforms/util_test.py | 50 
 2 files changed, 62 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/c39b02d9/sdks/python/apache_beam/transforms/util.py
--
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index e3f5b85..ac7eb3c 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -22,6 +22,7 @@ from __future__ import absolute_import
 
 from apache_beam.transforms import window
 from apache_beam.transforms.core import CombinePerKey
+from apache_beam.transforms.core import Create
 from apache_beam.transforms.core import Flatten
 from apache_beam.transforms.core import GroupByKey
 from apache_beam.transforms.core import Map
@@ -222,12 +223,17 @@ def assert_that(actual, matcher, label='assert_that'):
   class AssertThat(PTransform):
 
 def expand(self, pcoll):
-  return (pcoll
-  | WindowInto(window.GlobalWindows())
-  | "ToVoidKey" >> Map(lambda v: (None, v))
-  | "Group" >> GroupByKey()
-  | "UnKey" >> Map(lambda (k, v): v)
-  | "Match" >> Map(matcher))
+  # We must have at least a single element to ensure the matcher
+  # code gets run even if the input pcollection is empty.
+  keyed_singleton = pcoll.pipeline | Create([(None, None)])
+  keyed_actual = (
+  pcoll
+  | WindowInto(window.GlobalWindows())
+  | "ToVoidKey" >> Map(lambda v: (None, v)))
+  _ = ((keyed_singleton, keyed_actual)
+   | "Group" >> CoGroupByKey()
+   | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
+   | "Match" >> Map(matcher))
 
 def default_label(self):
   return label

http://git-wip-us.apache.org/repos/asf/beam/blob/c39b02d9/sdks/python/apache_beam/transforms/util_test.py
--
diff --git a/sdks/python/apache_beam/transforms/util_test.py 
b/sdks/python/apache_beam/transforms/util_test.py
new file mode 100644
index 000..9656827
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the util transforms."""
+
+import unittest
+
+from apache_beam import Create
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.transforms.util import assert_that, equal_to, is_empty
+
+
+class UtilTest(unittest.TestCase):
+
+  def test_assert_that_passes(self):
+with TestPipeline() as p:
+  assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
+
+  def test_assert_that_fails(self):
+with self.assertRaises(Exception):
+  with TestPipeline() as p:
+assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3]))
+
+  def test_assert_that_fails_on_empty_input(self):
+with self.assertRaises(Exception):
+  with TestPipeline() as p:
+assert_that(p | Create([]), equal_to([1, 2, 3]))
+
+  def test_assert_that_fails_on_empty_expected(self):
+with self.assertRaises(Exception):
+  with TestPipeline() as p:
+assert_that(p | Create([1, 2, 3]), is_empty())
+
+
+if __name__ == '__main__':
+  unittest.main()



[GitHub] beam pull request #2279: [BEAM-1768] Fix assert_that for empty inputs

2017-03-20 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2279

[BEAM-1768] Fix assert_that for empty inputs

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam passert

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2279.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2279


commit f46ac4d1e92d2080a97e0f29d084816f1b97af54
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-21T01:20:01Z

[BEAM-1768] Fix assert_that for empty inputs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam-site pull request #176: Update quickstart-java.md

2017-03-13 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam-site/pull/176

Update quickstart-java.md

I had to make this change for a clean build to work with the 0.6.0 source 
release.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam-site patch-3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam-site/pull/176.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #176


commit 38e76e8a36d2f4c4c7bd7376d55dd8678e42d978
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-13T20:26:56Z

Update quickstart-java.md

I had to make this change for a clean build to work with the 0.6.0 source 
release.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2233: Better error on missing gcp dependencies.

2017-03-13 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2233

Better error on missing gcp dependencies.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam gcs-error

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2233.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2233


commit 00a3817463b07f7fb58f536ee354483ebb1012fd
Author: Robert Bradshaw <rober...@google.com>
Date:   2017-03-13T18:18:50Z

Better error on missing gcp dependencies.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[7/9] beam git commit: Runner API encoding of WindowFns.

2017-03-09 Thread robertwb
Runner API encoding of WindowFns.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aad32b7a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aad32b7a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aad32b7a

Branch: refs/heads/master
Commit: aad32b7a00d1aea1e7e51b68ff609d2fb3b82a8f
Parents: bc76a18
Author: Robert Bradshaw 
Authored: Tue Mar 7 12:21:02 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Mar 9 20:29:01 2017 -0800

--
 sdks/python/apache_beam/transforms/window.py| 117 +++
 .../apache_beam/transforms/window_test.py   |  11 ++
 sdks/python/apache_beam/utils/proto_utils.py|  37 ++
 sdks/python/apache_beam/utils/urns.py   |   7 ++
 4 files changed, 172 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window.py
--
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index 14cf2f6..a562bcf 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -49,13 +49,20 @@ WindowFn.
 
 from __future__ import absolute_import
 
+from google.protobuf import struct_pb2
+from google.protobuf import wrappers_pb2
+
 from apache_beam import coders
+from apache_beam.internal import pickler
+from apache_beam.runners.api import beam_runner_api_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.transforms.timeutil import Duration
 from apache_beam.transforms.timeutil import MAX_TIMESTAMP
 from apache_beam.transforms.timeutil import MIN_TIMESTAMP
 from apache_beam.transforms.timeutil import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.utils import proto_utils
+from apache_beam.utils import urns
 
 
 # TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
@@ -131,6 +138,41 @@ class WindowFn(object):
 # By default, just return the input timestamp.
 return input_timestamp
 
+  _known_urns = {}
+
+  @classmethod
+  def register_urn(cls, urn, parameter_type, constructor):
+cls._known_urns[urn] = parameter_type, constructor
+
+  @classmethod
+  def from_runner_api(cls, fn_proto, context):
+parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
+return constructor(
+proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
+context)
+
+  def to_runner_api(self, context):
+urn, typed_param = self.to_runner_api_parameter(context)
+return beam_runner_api_pb2.FunctionSpec(
+spec=beam_runner_api_pb2.UrnWithParameter(
+urn=urn,
+parameter=proto_utils.pack_Any(typed_param)))
+
+  @staticmethod
+  def from_runner_api_parameter(fn_parameter, unused_context):
+return pickler.loads(fn_parameter.value)
+
+  def to_runner_api_parameter(self, context):
+raise TypeError(self)
+return (urns.PICKLED_WINDOW_FN,
+wrappers_pb2.BytesValue(value=pickler.dumps(self)))
+
+
+WindowFn.register_urn(
+urns.PICKLED_WINDOW_FN,
+wrappers_pb2.BytesValue,
+WindowFn.from_runner_api_parameter)
+
 
 class BoundedWindow(object):
   """A window for timestamps in range (-infinity, end).
@@ -251,6 +293,16 @@ class GlobalWindows(WindowFn):
   def __ne__(self, other):
 return not self == other
 
+  @staticmethod
+  def from_runner_api_parameter(unused_fn_parameter, unused_context):
+return GlobalWindows()
+
+  def to_runner_api_parameter(self, context):
+return urns.GLOBAL_WINDOWS_FN, None
+
+WindowFn.register_urn(
+urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
+
 
 class FixedWindows(WindowFn):
   """A windowing function that assigns each element to one time interval.
@@ -280,6 +332,29 @@ class FixedWindows(WindowFn):
   def merge(self, merge_context):
 pass  # No merging.
 
+  def __eq__(self, other):
+if type(self) == type(other) == FixedWindows:
+  return self.size == other.size and self.offset == other.offset
+
+  def __ne__(self, other):
+return not self == other
+
+  @staticmethod
+  def from_runner_api_parameter(fn_parameter, unused_context):
+return FixedWindows(
+size=Duration(micros=fn_parameter['size']),
+offset=Timestamp(micros=fn_parameter['offset']))
+
+  def to_runner_api_parameter(self, context):
+return (urns.FIXED_WINDOWS_FN,
+proto_utils.pack_Struct(size=self.size.micros,
+offset=self.offset.micros))
+
+WindowFn.register_urn(
+urns.FIXED_WINDOWS_FN,
+struct_pb2.Struct,
+FixedWindows.from_runner_api_parameter)
+
 
 class SlidingWindows(WindowFn):
   """A windowing function 

[8/9] beam git commit: Move pipeline context and add more tests.

2017-03-09 Thread robertwb
# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the windowing classes."""
+
+import unittest
+
+from apache_beam import coders
+from apache_beam.runners import pipeline_context
+
+
+class PipelineContextTest(unittest.TestCase):
+
+  def test_deduplication(self):
+context = pipeline_context.PipelineContext()
+bytes_coder_ref = context.coders.get_id(coders.BytesCoder())
+bytes_coder_ref2 = context.coders.get_id(coders.BytesCoder())
+self.assertEqual(bytes_coder_ref, bytes_coder_ref2)
+
+  def test_serialization(self):
+context = pipeline_context.PipelineContext()
+float_coder_ref = context.coders.get_id(coders.FloatCoder())
+bytes_coder_ref = context.coders.get_id(coders.BytesCoder())
+proto = context.to_runner_api()
+context2 = pipeline_context.PipelineContext.from_runner_api(proto)
+self.assertEqual(
+coders.FloatCoder(),
+context2.coders.get_by_id(float_coder_ref))
+self.assertEqual(
+coders.BytesCoder(),
+context2.coders.get_by_id(bytes_coder_ref))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/core.py
--
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 1fc63b2..3251671 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1235,6 +1235,7 @@ class Windowing(object):
 trigger=self.triggerfn.to_runner_api(context),
 accumulation_mode=self.accumulation_mode,
 output_time=self.output_time_fn,
+# TODO(robertwb): Support EMIT_IF_NONEMPTY
 closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
 allowed_lateness=0)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/trigger_test.py
--
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py 
b/sdks/python/apache_beam/transforms/trigger_test.py
index cc9e0f5..827aa33 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -25,6 +25,7 @@ import unittest
 import yaml
 
 import apache_beam as beam
+from apache_beam.runners import pipeline_context
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import trigger
 from apache_beam.transforms.core import Windowing
@@ -392,22 +393,7 @@ class RunnerApiTest(unittest.TestCase):
 AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
 Repeatedly(AfterCount(100)),
 trigger.OrFinally(AfterCount(3), AfterCount(10))):
-  context = beam.pipeline.PipelineContext()
-  self.assertEqual(
-  trigger_fn,
-  TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), 
context))
-
-  def test_windowing_strategy_encoding(self):
-for trigger_fn in (
-DefaultTrigger(),
-AfterAll(AfterCount(1), AfterCount(10)),
-AfterFirst(AfterCount(10), AfterCount(100)),
-AfterEach(AfterCount(100), AfterCount(1000)),
-AfterWatermark(early=AfterCount(1000)),
-AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
-Repeatedly(AfterCount(100)),
-trigger.OrFinally(AfterCount(3), AfterCount(10))):
-  context = beam.pipeline.PipelineContext()
+  context = pipeline_context.PipelineContext()
   self.assertEqual(
   trigger_fn,
   TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), 
context))

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window.py
--
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index c763a96..3878dff 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -73,6 +73,7 @@ class OutputTimeFn(object):
   OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
   OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE
   OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE
+  # TODO(robertwb): Add this to the runner API or remove it.
   OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
 
   @staticmethod
@@ -167,7 +168,6 @@ class WindowFn(object):
 return pickler.loads(fn_parameter.value)
 
   def to_runner_api_parameter(self, context):
-raise TypeError(self)
 return (urns.PICKLED_WINDOW_FN,
 wrappers_pb2.BytesValue(value=pickler.dumps(self)))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window_test.py

[3/9] beam git commit: Auto-generated runner api proto bindings.

2017-03-09 Thread robertwb
http://git-wip-us.apache.org/repos/asf/beam/blob/3bb125e1/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
--
diff --git a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py 
b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
new file mode 100644
index 000..66c331b
--- /dev/null
+++ b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
@@ -0,0 +1,2755 @@
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: beam_runner_api.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf.internal import enum_type_wrapper
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf import descriptor_pb2
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+  name='beam_runner_api.proto',
+  package='org.apache.beam.runner_api.v1',
+  syntax='proto3',
+  
serialized_pb=_b('\n\x15\x62\x65\x61m_runner_api.proto\x12\x1dorg.apache.beam.runner_api.v1\x1a\x19google/protobuf/any.proto\"\x8d\x07\n\nComponents\x12M\n\ntransforms\x18\x01
 
\x03(\x0b\x32\x39.org.apache.beam.runner_api.v1.Components.TransformsEntry\x12Q\n\x0cpcollections\x18\x02
 
\x03(\x0b\x32;.org.apache.beam.runner_api.v1.Components.PcollectionsEntry\x12`\n\x14windowing_strategies\x18\x03
 
\x03(\x0b\x32\x42.org.apache.beam.runner_api.v1.Components.WindowingStrategiesEntry\x12\x45\n\x06\x63oders\x18\x04
 
\x03(\x0b\x32\x35.org.apache.beam.runner_api.v1.Components.CodersEntry\x12Q\n\x0c\x65nvironments\x18\x05
 
\x03(\x0b\x32;.org.apache.beam.runner_api.v1.Components.EnvironmentsEntry\x1a\\\n\x0fTransformsEntry\x12\x0b\n\x03key\x18\x01
 \x01(\t\x12\x38\n\x05value\x18\x02 
\x01(\x0b\x32).org.apache.beam.runner_api.v1.PTransform:\x02\x38\x01\x1a_\n\x11PcollectionsEntry\x12\x0b\n\x03key\x18\x01
 \x01(\t\x12\x39\n\x05value\x18\x02 
\x01(\x0b\x32*.org.apache.beam.runner_api.v1.PCollection:\x02\
 x38\x01\x1al\n\x18WindowingStrategiesEntry\x12\x0b\n\x03key\x18\x01 
\x01(\t\x12?\n\x05value\x18\x02 
\x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowingStrategy:\x02\x38\x01\x1aS\n\x0b\x43odersEntry\x12\x0b\n\x03key\x18\x01
 \x01(\t\x12\x33\n\x05value\x18\x02 
\x01(\x0b\x32$.org.apache.beam.runner_api.v1.Coder:\x02\x38\x01\x1a_\n\x11\x45nvironmentsEntry\x12\x0b\n\x03key\x18\x01
 \x01(\t\x12\x39\n\x05value\x18\x02 
\x01(\x0b\x32*.org.apache.beam.runner_api.v1.Environment:\x02\x38\x01\"\xe4\x06\n\x15MessageWithComponents\x12=\n\ncomponents\x18\x01
 
\x01(\x0b\x32).org.apache.beam.runner_api.v1.Components\x12\x35\n\x05\x63oder\x18\x02
 
\x01(\x0b\x32$.org.apache.beam.runner_api.v1.CoderH\x00\x12H\n\x0f\x63ombine_payload\x18\x03
 
\x01(\x0b\x32-.org.apache.beam.runner_api.v1.CombinePayloadH\x00\x12\x44\n\rfunction_spec\x18\x04
 
\x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpecH\x00\x12\x45\n\x0epar_do_payload\x18\x06
 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.ParDoPayloadH\x00\x12?\
 n\nptransform\x18\x07 
\x01(\x0b\x32).org.apache.beam.runner_api.v1.PTransformH\x00\x12\x41\n\x0bpcollection\x18\x08
 
\x01(\x0b\x32*.org.apache.beam.runner_api.v1.PCollectionH\x00\x12\x42\n\x0cread_payload\x18\t
 
\x01(\x0b\x32*.org.apache.beam.runner_api.v1.ReadPayloadH\x00\x12>\n\nside_input\x18\x0b
 
\x01(\x0b\x32(.org.apache.beam.runner_api.v1.SideInputH\x00\x12O\n\x13window_into_payload\x18\x0c
 
\x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowIntoPayloadH\x00\x12N\n\x12windowing_strategy\x18\r
 
\x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowingStrategyH\x00\x12M\n\x12urn_with_parameter\x18\x0e
 
\x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameterH\x00\x42\x06\n\x04root\"\xa6\x01\n\x08Pipeline\x12=\n\ncomponents\x18\x01
 
\x01(\x0b\x32).org.apache.beam.runner_api.v1.Components\x12\x19\n\x11root_transform_id\x18\x02
 \x01(\t\x12@\n\x0c\x64isplay_data\x18\x03 
\x01(\x0b\x32*.org.apache.beam.runner_api.v1.DisplayData\"\xa8\x03\n\nPTransform\x12\x13\n\x0bunique_name\x18\x05
  \x01(\t\x12=\n\x04spec\x18\x01 
\x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameter\x12\x15\n\rsubtransforms\x18\x02
 \x03(\t\x12\x45\n\x06inputs\x18\x03 
\x03(\x0b\x32\x35.org.apache.beam.runner_api.v1.PTransform.InputsEntry\x12G\n\x07outputs\x18\x04
 
\x03(\x0b\x32\x36.org.apache.beam.runner_api.v1.PTransform.OutputsEntry\x12@\n\x0c\x64isplay_data\x18\x06
 
\x01(\x0b\x32*.org.apache.beam.runner_api.v1.DisplayData\x1a-\n\x0bInputsEntry\x12\x0b\n\x03key\x18\x01
 \x01(\t\x12\r\n\x05value\x18\x02 
\x01(\t:\x02\x38\x01\x1a.\n\x0cOutputsEntry\x12\x0b\n\x03key\x18\x01 
\x01(\t\x12\r\n\x05value\x18\x02 

[1/9] beam git commit: Runner API context helper classes.

2017-03-09 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master f13a84d67 -> 2c2424cb4


Runner API context helper classes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc76a186
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc76a186
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc76a186

Branch: refs/heads/master
Commit: bc76a186099568ef292ceb007388ae7174150bc2
Parents: 3bb125e
Author: Robert Bradshaw 
Authored: Tue Mar 7 12:04:27 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Mar 9 20:29:00 2017 -0800

--
 sdks/python/apache_beam/pipeline.py | 62 
 1 file changed, 62 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/bc76a186/sdks/python/apache_beam/pipeline.py
--
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 7db39a9..4ec2e47 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,11 +52,14 @@ import os
 import shutil
 import tempfile
 
+from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.internal import pickler
 from apache_beam.runners import create_runner
 from apache_beam.runners import PipelineRunner
+from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.transforms import core
 from apache_beam.transforms import ptransform
 from apache_beam.typehints import TypeCheckError
 from apache_beam.utils.pipeline_options import PipelineOptions
@@ -440,3 +443,62 @@ class AppliedPTransform(object):
 if v not in visited:
   visited.add(v)
   visitor.visit_value(v, self)
+
+
+class PipelineContextMap(object):
+  """This is a bi-directional map between objects and ids.
+
+  Under the hood it encodes and decodes these objects into runner API
+  representations.
+  """
+  def __init__(self, context, obj_type, proto_map=None):
+self._pipeline_context = context
+self._obj_type = obj_type
+self._obj_to_id = {}
+self._id_to_obj = {}
+self._id_to_proto = proto_map if proto_map else {}
+self._counter = 0
+
+  def _unique_ref(self):
+self._counter += 1
+return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
+
+  def populate_map(self, proto_map):
+for id, obj in self._id_to_obj:
+  proto_map[id] = self._id_to_proto[id]
+
+  def get_id(self, obj):
+if obj not in self._obj_to_id:
+  id = self._unique_ref()
+  self._id_to_obj[id] = obj
+  self._obj_to_id[obj] = id
+  self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
+return self._obj_to_id[obj]
+
+  def get_by_id(self, id):
+if id not in self._id_to_obj:
+  self._id_to_obj[id] = self._obj_type.from_runner_api(
+self._id_to_proto[id], self._pipeline_context)
+return self._id_to_obj[id]
+
+
+class PipelineContext(object):
+
+  _COMPONENT_TYPES = {
+'transforms': AppliedPTransform,
+'pcollections': pvalue.PCollection,
+'coders': coders.Coder,
+'windowing_strategies': core.Windowing,
+# TODO: environment
+  }
+
+  def __init__(self, context_proto=None):
+for name, cls in self._COMPONENT_TYPES.items():
+  setattr(self, name,
+  PipelineContextMap(self, cls, getattr(context_proto, name, 
None)))
+
+  def to_runner_api(self):
+context_proto = beam_runner_api_pb2.Components()
+for name, cls in self._COMPONENT_TYEPS:
+  getattr(self, name).populate_map(getattr(context_proto, name))
+return context_proto



[6/9] beam git commit: Runner API translation of triggers and windowing strategies.

2017-03-09 Thread robertwb
nd other._is_default:
+return True
+  else:
+return (
+self.windowfn == other.windowfn
+and self.triggerfn == other.triggerfn
+and self.accumulation_mode == other.accumulation_mode
+and self.output_time_fn == other.output_time_fn)
+
   def is_default(self):
 return self._is_default
 
+  def to_runner_api(self, context):
+return beam_runner_api_pb2.WindowingStrategy(
+window_fn=self.windowfn.to_runner_api(context),
+# TODO(robertwb): Prohibit implicit multi-level merging.
+merge_status=(beam_runner_api_pb2.NEEDS_MERGE
+  if self.windowfn.is_merging()
+  else beam_runner_api_pb2.NON_MERGING),
+window_coder_id=context.coders.get_id(
+self.windowfn.get_window_coder()),
+trigger=self.triggerfn.to_runner_api(context),
+accumulation_mode=self.accumulation_mode,
+output_time=self.output_time_fn,
+closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
+allowed_lateness=0)
+
+  @staticmethod
+  def from_runner_api(proto, context):
+# pylint: disable=wrong-import-order, wrong-import-position
+from apache_beam.transforms.trigger import TriggerFn
+return Windowing(
+windowfn=WindowFn.from_runner_api(proto.window_fn, context),
+triggerfn=TriggerFn.from_runner_api(proto.trigger, context),
+accumulation_mode=proto.accumulation_mode,
+output_time_fn=proto.output_time)
+
 
 @typehints.with_input_types(T)
 @typehints.with_output_types(T)

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/trigger.py
--
diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index 04198ba..b55d602 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -35,13 +35,14 @@ from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import OutputTimeFn
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.transforms.window import WindowFn
+from apache_beam.runners.api import beam_runner_api_pb2
 
 
 class AccumulationMode(object):
   """Controls what to do with data when a trigger fires multiple times.
   """
-  DISCARDING = 1
-  ACCUMULATING = 2
+  DISCARDING = beam_runner_api_pb2.DISCARDING
+  ACCUMULATING = beam_runner_api_pb2.ACCUMULATING
   # TODO(robertwb): Provide retractions of previous outputs.
   # RETRACTING = 3
 
@@ -185,6 +186,26 @@ class TriggerFn(object):
 pass
 # pylint: enable=unused-argument
 
+  @staticmethod
+  def from_runner_api(proto, context):
+return {
+'after_all': AfterAll,
+'after_any': AfterFirst,
+'after_each': AfterEach,
+'after_end_of_widow': AfterWatermark,
+# after_processing_time, after_synchronized_processing_time
+# always
+'default': DefaultTrigger,
+'element_count': AfterCount,
+# never
+'or_finally': OrFinally,
+'repeat': Repeatedly,
+}[proto.WhichOneof('trigger')].from_runner_api(proto, context)
+
+  @abstractmethod
+  def to_runner_api(self, unused_context):
+pass
+
 
 class DefaultTrigger(TriggerFn):
   """Semantically Repeatedly(AfterWatermark()), but more optimized."""
@@ -216,6 +237,14 @@ class DefaultTrigger(TriggerFn):
   def __eq__(self, other):
 return type(self) == type(other)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+return DefaultTrigger()
+
+  def to_runner_api(self, unused_context):
+return beam_runner_api_pb2.Trigger(
+default=beam_runner_api_pb2.Trigger.Default())
+
 
 class AfterWatermark(TriggerFn):
   """Fire exactly once when the watermark passes the end of the window.
@@ -235,9 +264,9 @@ class AfterWatermark(TriggerFn):
   def __repr__(self):
 qualifiers = []
 if self.early:
-  qualifiers.append('early=%s' % self.early)
+  qualifiers.append('early=%s' % self.early.underlying)
 if self.late:
-  qualifiers.append('late=%s' % self.late)
+  qualifiers.append('late=%s' % self.late.underlying)
 return 'AfterWatermark(%s)' % ', '.join(qualifiers)
 
   def is_late(self, context):
@@ -305,6 +334,28 @@ class AfterWatermark(TriggerFn):
   def __hash__(self):
 return hash((type(self), self.early, self.late))
 
+  @staticmethod
+  def from_runner_api(proto, context):
+return AfterWatermark(
+early=TriggerFn.from_runner_api(
+proto.after_end_of_widow.early_firings, context)
+if proto.after_end_of_widow.HasField('early_firings')
+else None,
+late=TriggerFn.from_runner_api(
+proto.after_end_of_widow.late_firings, context)
+if proto.after_end_of_widow.HasField('late_firings')
+else 

[2/9] beam git commit: Auto-generated runner api proto bindings.

2017-03-09 Thread robertwb
http://git-wip-us.apache.org/repos/asf/beam/blob/3bb125e1/sdks/python/run_pylint.sh
--
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index afc5fb4..5e63856 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -34,7 +34,8 @@ EXCLUDED_GENERATED_FILES=(
 
"apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py"
 "apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py"
 "apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py"
-"apache_beam/coders/proto2_coder_test_messages_pb2.py")
+"apache_beam/coders/proto2_coder_test_messages_pb2.py"
+"apache_beam/runners/api/beam_runner_api_pb2.py")
 
 FILES_TO_IGNORE=""
 for file in "${EXCLUDED_GENERATED_FILES[@]}"; do



[9/9] beam git commit: Closes #2190

2017-03-09 Thread robertwb
Closes #2190


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c2424cb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c2424cb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c2424cb

Branch: refs/heads/master
Commit: 2c2424cb44bb2976ea9099230106a639b5ee3993
Parents: f13a84d deff128
Author: Robert Bradshaw 
Authored: Thu Mar 9 20:29:03 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Mar 9 20:29:03 2017 -0800

--
 sdks/python/apache_beam/coders/coders.py|  113 +
 sdks/python/apache_beam/runners/api/__init__.py |   16 +
 .../runners/api/beam_runner_api_pb2.py  | 2772 ++
 .../apache_beam/runners/pipeline_context.py |   88 +
 .../runners/pipeline_context_test.py|   49 +
 sdks/python/apache_beam/transforms/core.py  |   39 +
 sdks/python/apache_beam/transforms/trigger.py   |  143 +-
 .../apache_beam/transforms/trigger_test.py  |   19 +
 sdks/python/apache_beam/transforms/window.py|  147 +-
 .../apache_beam/transforms/window_test.py   |   32 +
 sdks/python/apache_beam/utils/proto_utils.py|   54 +
 sdks/python/apache_beam/utils/urns.py   |   24 +
 sdks/python/run_pylint.sh   |3 +-
 13 files changed, 3481 insertions(+), 18 deletions(-)
--




[5/9] beam git commit: Add license to new files.

2017-03-09 Thread robertwb
Add license to new files.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2da21e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2da21e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2da21e2

Branch: refs/heads/master
Commit: b2da21e287660bb3077bf89e092f7aa3c385906b
Parents: 5b86e1f
Author: Robert Bradshaw 
Authored: Wed Mar 8 13:47:53 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Mar 9 20:29:01 2017 -0800

--
 sdks/python/apache_beam/runners/api/__init__.py| 16 
 .../apache_beam/runners/api/beam_runner_api_pb2.py | 17 +
 sdks/python/apache_beam/utils/proto_utils.py   | 17 +
 sdks/python/apache_beam/utils/urns.py  | 17 +
 4 files changed, 67 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/runners/api/__init__.py
--
diff --git a/sdks/python/apache_beam/runners/api/__init__.py 
b/sdks/python/apache_beam/runners/api/__init__.py
index e69de29..cce3aca 100644
--- a/sdks/python/apache_beam/runners/api/__init__.py
+++ b/sdks/python/apache_beam/runners/api/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
--
diff --git a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py 
b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
index 66c331b..f235ce8 100644
--- a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
+++ b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: beam_runner_api.proto
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/utils/proto_utils.py
--
diff --git a/sdks/python/apache_beam/utils/proto_utils.py 
b/sdks/python/apache_beam/utils/proto_utils.py
index 0ece8f5..b4bfdca 100644
--- a/sdks/python/apache_beam/utils/proto_utils.py
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 from google.protobuf import any_pb2
 from google.protobuf import struct_pb2
 


[GitHub] beam pull request #2214: Ensure that assert_that takes a PCollection as its ...

2017-03-09 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2214

Ensure that assert_that takes a PCollection as its first argument.

This can avoid (silent) error such as

with test_pipeline.TestPipeline() as p:
  bad_value = "Create0" >> beam.Create([1])  # Note missing "p | "
  beam_util.assert_that(bad_value, beam_util.equal_to([0]))

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam assert-that

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2214


commit 479cc0181daa3b98ef6ccdc8c3c1512bc2e3efae
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-10T00:18:46Z

Ensure that assert_that takes a PCollection as its first argument.

This can avoid (silent) error such as

with test_pipeline.TestPipeline() as p:
  bad_value = "Create0" >> beam.Create([1])  # Note missing "p | "
  beam_util.assert_that(bad_value, beam_util.equal_to([0]))




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2191: Fix typo in proto: widow -> window.

2017-03-07 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2191

Fix typo in proto: widow -> window.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam widow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2191.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2191


commit 558731978ae8e62c59130314de9db57d253ba10f
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-08T01:36:41Z

Fix typo in proto: widow -> window.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2190: [BEAM-115] Runner API representation of windowing s...

2017-03-07 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2190

[BEAM-115] Runner API representation of windowing strategies for Python

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam py-runner-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2190.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2190


commit 356266b26684ad7e6846eaba33f6744f365890cf
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-07T20:02:08Z

Auto-generated runner api proto bindings.

commit e18be9cb1dd665a10b7250209c28d10600614bdb
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-07T20:04:27Z

Runner API context helper classes.

commit 0624235719bb9f813e620939fc0e11ac713708cb
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-07T20:21:02Z

Runner API encoding of WindowFns.

commit 243ba920ee1682f8c7863c339b7d057c9fecb14c
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-03-08T00:18:02Z

Runner API translation of triggers and windowing strategies.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: Updates Python SDK source API so that sources can report limited parallelism signals.

2017-03-02 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master de9e8528c -> 8f5f19d11


Updates Python SDK source API so that sources can report limited parallelism 
signals.

With this update Python BoundedSource/RangeTracker API can report consumed and 
remaining number of split points while performing a source read operations, 
similar to Java SDK sources.

These signals can be used by runner implementations, for example, to perform 
scaling decisions.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97a7ae44
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97a7ae44
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97a7ae44

Branch: refs/heads/master
Commit: 97a7ae449e1ccf6b08a0ee0bc2fc0a1b49924f1f
Parents: de9e852
Author: Chamikara Jayalath 
Authored: Wed Jan 4 19:10:09 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Mar 2 17:19:08 2017 -0800

--
 sdks/python/apache_beam/io/avroio.py|  33 +++-
 sdks/python/apache_beam/io/avroio_test.py   |  31 
 sdks/python/apache_beam/io/iobase.py| 157 ++-
 sdks/python/apache_beam/io/range_trackers.py|  41 -
 .../apache_beam/io/range_trackers_test.py   |  52 ++
 sdks/python/apache_beam/io/textio.py|  17 +-
 sdks/python/apache_beam/io/textio_test.py   |  13 ++
 .../runners/dataflow/native_io/iobase.py|  13 +-
 8 files changed, 342 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/97a7ae44/sdks/python/apache_beam/io/avroio.py
--
diff --git a/sdks/python/apache_beam/io/avroio.py 
b/sdks/python/apache_beam/io/avroio.py
index 5dab651..ab98530 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -28,6 +28,7 @@ from avro import schema
 import apache_beam as beam
 from apache_beam.io import filebasedsource
 from apache_beam.io import fileio
+from apache_beam.io import iobase
 from apache_beam.io.iobase import Read
 from apache_beam.transforms import PTransform
 
@@ -135,6 +136,7 @@ class _AvroUtils(object):
   ValueError: If the block cannot be read properly because the file doesn't
 match the specification.
 """
+offset = f.tell()
 decoder = avroio.BinaryDecoder(f)
 num_records = decoder.read_long()
 block_size = decoder.read_long()
@@ -144,7 +146,8 @@ class _AvroUtils(object):
   raise ValueError('Unexpected sync marker (actual "%s" vs expected "%s"). 
'
'Maybe the underlying avro file is corrupted?',
sync_marker, expected_sync_marker)
-return _AvroBlock(block_bytes, num_records, codec, schema)
+size = f.tell() - offset
+return _AvroBlock(block_bytes, num_records, codec, schema, offset, size)
 
   @staticmethod
   def advance_file_past_next_sync_marker(f, sync_marker):
@@ -172,13 +175,22 @@ class _AvroUtils(object):
 class _AvroBlock(object):
   """Represents a block of an Avro file."""
 
-  def __init__(self, block_bytes, num_records, codec, schema_string):
+  def __init__(self, block_bytes, num_records, codec, schema_string,
+   offset, size):
 # Decompress data early on (if needed) and thus decrease the number of
 # parallel copies of the data in memory at any given in time during
 # block iteration.
 self._decompressed_block_bytes = self._decompress_bytes(block_bytes, codec)
 self._num_records = num_records
 self._schema = schema.parse(schema_string)
+self._offset = offset
+self._size = size
+
+  def size(self):
+return self._size
+
+  def offset(self):
+return self._offset
 
   @staticmethod
   def _decompress_bytes(data, codec):
@@ -232,12 +244,26 @@ class _AvroSource(filebasedsource.FileBasedSource):
   """
 
   def read_records(self, file_name, range_tracker):
+next_block_start = -1
+
+def split_points_unclaimed(stop_position):
+  if next_block_start >= stop_position:
+# Next block starts at or after the suggested stop position. Hence
+# there will not be split points to be claimed for the range ending at
+# suggested stop position.
+return 0
+
+  return iobase.RangeTracker.SPLIT_POINTS_UNKNOWN
+
+range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed)
+
 start_offset = range_tracker.start_position()
 if start_offset is None:
   start_offset = 0
 
 with self.open_file(file_name) as f:
-  codec, schema_string, sync_marker = 
_AvroUtils.read_meta_data_from_file(f)
+  codec, schema_string, sync_marker = _AvroUtils.read_meta_data_from_file(
+  f)
 
   # We have to start at current position if previous bundle ended at the
   # end of a 

[1/2] beam git commit: Inline rather than reference FunctionSpecs.

2017-03-01 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 79b1395c2 -> d84b06791


Inline rather than reference FunctionSpecs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d390406e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d390406e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d390406e

Branch: refs/heads/master
Commit: d390406e27112faed31233d7daef1f650a31cd0f
Parents: 79b1395
Author: Robert Bradshaw 
Authored: Tue Feb 28 15:51:24 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Mar 1 09:04:30 2017 -0800

--
 .../src/main/proto/beam_runner_api.proto| 39 +---
 .../beam/sdk/util/WindowingStrategies.java  | 18 ++---
 2 files changed, 20 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/d390406e/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 58532b2..44ead56 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -47,10 +47,6 @@ message Components {
 
   // (Required) A map from pipeline-scoped id to Environment.
   map environments = 5;
-
-  // (Required) A map from pipeline-scoped id to FunctionSpec,
-  // a record for a particular user-defined function.
-  map function_specs = 6;
 }
 
 // A disjoint union of all the things that may contain references
@@ -207,8 +203,8 @@ message PCollection {
 // The payload for the primitive ParDo transform.
 message ParDoPayload {
 
-  // (Required) The pipeline-scoped id of the FunctionSpec for the DoFn.
-  string fn_id = 1;
+  // (Required) The FunctionSpec of the DoFn.
+  FunctionSpec do_fn = 1;
 
   // (Required) Additional pieces of context the DoFn may require that
   // are not otherwise represented in the payload.
@@ -266,9 +262,8 @@ enum IsBounded {
 // The payload for the primitive Read transform.
 message ReadPayload {
 
-  // (Required) The pipeline-scoped id of the FunctionSpec of the source for
-  // this Read.
-  string source_id = 1;
+  // (Required) The FunctionSpec of the source for this Read.
+  FunctionSpec source = 1;
 
   // (Required) Whether the source is bounded or unbounded
   IsBounded is_bounded = 2;
@@ -279,15 +274,15 @@ message ReadPayload {
 // The payload for the WindowInto transform.
 message WindowIntoPayload {
 
-  // (Required) The pipeline-scoped id for the FunctionSpec of the WindowFn.
-  string fn_id = 1;
+  // (Required) The FunctionSpec of the WindowFn.
+  FunctionSpec window_fn = 1;
 }
 
 // The payload for the special-but-not-primitive Combine transform.
 message CombinePayload {
 
-  // (Required) The pipeline-scoped id of the FunctionSpec for the CombineFn.
-  string fn_id = 1;
+  // (Required) The FunctionSpec of the CombineFn.
+  FunctionSpec combine_fn = 1;
 
   // (Required) A reference to the Coder to use for accumulators of the 
CombineFn
   string accumulator_coder_id = 2;
@@ -325,10 +320,10 @@ message Coder {
 // TODO: consider inlining field on PCollection
 message WindowingStrategy {
 
-  // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
-  // assigns windows, merges windows, and shifts timestamps before they are
+  // (Required) The FunctionSpec of the UDF that assigns windows,
+  // merges windows, and shifts timestamps before they are
   // combined according to the OutputTime.
-  string fn_id = 1;
+  FunctionSpec window_fn = 1;
 
   // (Required) Whether or not the window fn is merging.
   //
@@ -584,20 +579,20 @@ message SideInput {
   // URN)
   UrnWithParameter access_pattern = 1;
 
-  // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
-  // adapts a particular access_pattern to a user-facing view type.
+  // (Required) The FunctionSpec of the UDF that adapts a particular
+  // access_pattern to a user-facing view type.
   //
   // For example, View.asSingleton() may include a `view_fn` that adapts a
   // specially-designed multimap to a single value per window.
-  string view_fn_id = 2;
+  FunctionSpec view_fn = 2;
 
-  // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
-  // maps a main input window to a side input window.
+  // (Required) The FunctionSpec of the UDF that maps a main input window
+  // to a side input window.
   //
   // For example, when the main input is in fixed windows of one hour, this
   // can specify that the side input should be accessed according to the day
   // in which that hour falls.
-  string window_mapping_fn_id = 3;
+  

[2/2] beam git commit: Closes #2131

2017-03-01 Thread robertwb
Closes #2131


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d84b0679
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d84b0679
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d84b0679

Branch: refs/heads/master
Commit: d84b0679178ab803081ee3e05fe222d234e94c30
Parents: 79b1395 d390406
Author: Robert Bradshaw 
Authored: Wed Mar 1 09:04:31 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Mar 1 09:04:31 2017 -0800

--
 .../src/main/proto/beam_runner_api.proto| 39 +---
 .../beam/sdk/util/WindowingStrategies.java  | 18 ++---
 2 files changed, 20 insertions(+), 37 deletions(-)
--




[1/3] beam git commit: Include cython tests in presubmits for linux platform

2017-02-28 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 3f4e008bf -> 352af855e


Include cython tests in presubmits for linux platform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2a03dd4e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2a03dd4e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2a03dd4e

Branch: refs/heads/master
Commit: 2a03dd4e1c0ca0d72f44101c15fa3b824e245628
Parents: b498e11
Author: Vikas Kedigehalli 
Authored: Tue Feb 28 10:09:40 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Feb 28 16:53:05 2017 -0800

--
 sdks/python/tox.ini | 22 +-
 1 file changed, 21 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/2a03dd4e/sdks/python/tox.ini
--
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 521f106..927c211 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -16,7 +16,7 @@
 ;
 
 [tox]
-envlist = py27,py27gcp,lint,docs
+envlist = py27,py27gcp,py27cython,lint,docs
 
 [pep8]
 # Disable all errors and warnings except for the ones related to blank lines.
@@ -34,6 +34,26 @@ commands =
   python setup.py test
 passenv = TRAVIS*
 
+[testenv:py27cython]
+# cython tests are only expected to work in linux (2.x and 3.x)
+# If we want to add other platforms in the future, it should be:
+# `platform = linux2|darwin|...`
+# See https://docs.python.org/2/library/sys.html#sys.platform for platform 
codes
+platform = linux2
+# autocomplete_test depends on nose when invoked directly.
+deps =
+  nose
+  cython
+commands =
+  python --version
+  pip install -e .[test]
+  python apache_beam/examples/complete/autocomplete_test.py
+  python setup.py test
+  # Clean up all cython generated files.
+  find apache_beam -type f -name '*.c' -delete
+  find apache_beam -type f -name '*.so' -delete
+passenv = TRAVIS*
+
 [testenv:py27gcp]
 # autocomplete_test depends on nose when invoked directly.
 deps =



[3/3] beam git commit: Closes #2128

2017-02-28 Thread robertwb
Closes #2128


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/352af855
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/352af855
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/352af855

Branch: refs/heads/master
Commit: 352af855edf9e07978d1d6d2a61fde372f927165
Parents: 3f4e008 2a03dd4
Author: Robert Bradshaw 
Authored: Tue Feb 28 16:53:06 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Feb 28 16:53:06 2017 -0800

--
 sdks/python/apache_beam/coders/stream.pxd |  2 +-
 sdks/python/tox.ini   | 22 +-
 2 files changed, 22 insertions(+), 2 deletions(-)
--




[2/3] beam git commit: Update output stream cython declaration

2017-02-28 Thread robertwb
Update output stream cython declaration


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b498e115
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b498e115
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b498e115

Branch: refs/heads/master
Commit: b498e115ac6ee043c45d2868ccf10e5132669c29
Parents: 3f4e008
Author: Vikas Kedigehalli 
Authored: Tue Feb 28 07:59:36 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Feb 28 16:53:05 2017 -0800

--
 sdks/python/apache_beam/coders/stream.pxd | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/b498e115/sdks/python/apache_beam/coders/stream.pxd
--
diff --git a/sdks/python/apache_beam/coders/stream.pxd 
b/sdks/python/apache_beam/coders/stream.pxd
index a1eb9f7..4e01a89 100644
--- a/sdks/python/apache_beam/coders/stream.pxd
+++ b/sdks/python/apache_beam/coders/stream.pxd
@@ -20,7 +20,7 @@ cimport libc.stdint
 
 cdef class OutputStream(object):
   cdef char* data
-  cdef size_t _size
+  cdef size_t buffer_size
   cdef size_t pos
 
   cpdef write(self, bytes b, bint nested=*)



[GitHub] beam pull request #2131: [BEAM-115] Inline rather than reference FunctionSpe...

2017-02-28 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2131

[BEAM-115] Inline rather than reference FunctionSpecs.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam runner-protos

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2131


commit cbfce74aae64f9b353c07ced8427dc77ab1c31f4
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-02-28T23:51:24Z

Inline rather than reference FunctionSpecs.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: Make side inputs a map, rather than embedding the name in the message.

2017-02-28 Thread robertwb
Make side inputs a map, rather than embedding the name in the message.

The "local" name only make sense in context.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d59b10f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d59b10f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d59b10f

Branch: refs/heads/master
Commit: 2d59b10f90bfaa1c455dee1545504d9da74a9163
Parents: 61e31e6
Author: Robert Bradshaw 
Authored: Mon Feb 27 17:59:57 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Feb 28 14:39:34 2017 -0800

--
 .../src/main/proto/beam_runner_api.proto| 25 +++-
 1 file changed, 9 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/2d59b10f/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 258c278..58532b2 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -215,10 +215,9 @@ message ParDoPayload {
   // (may force runners to execute the ParDo differently)
   repeated Parameter parameters = 2;
 
-  // (Optional) An ordered list of side inputs, describing for each local name
-  // to the data to be provided and the expected access pattern.
-  // (the SDK may not be order-sensitive)
-  repeated SideInput side_inputs = 3;
+  // (Optional) A mapping of local input names to side inputs, describing
+  // the expected access pattern.
+  map side_inputs = 3;
 
   // (Optional) if the DoFn uses state, a list of the specs for cells.
   repeated StateSpec state_specs = 4;
@@ -298,10 +297,9 @@ message CombinePayload {
   // (may force runners to execute the ParDo differently)
   repeated Parameter parameters = 3;
 
-  // (Optional) An ordered list of side inputs, describing for each local name
-  // to the data to be provided and the expected access pattern.
-  // (the SDK may not be order-sensitive)
-  repeated SideInput side_inputs = 4;
+  // (Optional) A mapping of local input names to side inputs, describing
+  // the expected access pattern.
+  map side_inputs = 4;
 }
 
 // A coder, the binary format for serialization and deserialization of data in
@@ -575,11 +573,6 @@ message TimestampTransform {
 
 // A specification for how to "side input" a PCollection.
 message SideInput {
-
-  // (Required) A local name for this side input, as interpreted by its
-  // parent PTransform (and/or its PTransform's UDFs).
-  string name = 1;
-
   // (Required) URN of the access pattern required by the `view_fn` to present
   // the desired SDK-specific interface to a UDF.
   //
@@ -589,14 +582,14 @@ message SideInput {
   // The only access pattern intended for Beam, because of its superior
   // performance possibilities, is "urn:beam:sideinput:multimap" (or some such
   // URN)
-  UrnWithParameter access_pattern = 3;
+  UrnWithParameter access_pattern = 1;
 
   // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
   // adapts a particular access_pattern to a user-facing view type.
   //
   // For example, View.asSingleton() may include a `view_fn` that adapts a
   // specially-designed multimap to a single value per window.
-  string view_fn_id = 4;
+  string view_fn_id = 2;
 
   // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
   // maps a main input window to a side input window.
@@ -604,7 +597,7 @@ message SideInput {
   // For example, when the main input is in fixed windows of one hour, this
   // can specify that the side input should be accessed according to the day
   // in which that hour falls.
-  string window_mapping_fn_id = 5;
+  string window_mapping_fn_id = 3;
 }
 
 // An environment for executing UDFs. Generally an SDK container URL, but



[1/2] beam git commit: Closes #2124

2017-02-28 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 61e31e622 -> 3f4e008bf


Closes #2124


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3f4e008b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3f4e008b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3f4e008b

Branch: refs/heads/master
Commit: 3f4e008bf5231771094878d92256b674730630dd
Parents: 61e31e6 2d59b10
Author: Robert Bradshaw 
Authored: Tue Feb 28 14:39:34 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Feb 28 14:39:34 2017 -0800

--
 .../src/main/proto/beam_runner_api.proto| 25 +++-
 1 file changed, 9 insertions(+), 16 deletions(-)
--




[GitHub] beam pull request #2124: Make side inputs a map, rather than embedding the n...

2017-02-27 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2124

Make side inputs a map, rather than embedding the name in the message.

The "local" name only make sense in context.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam runner-protos

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2124


commit b2c0f93d0f4d63836a79956d75b6dd100e9138ba
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-02-28T01:59:57Z

Make side inputs a map, rather than embedding the name in the message.

The "local" name only make sense in context.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/3] beam git commit: Make access_pattern a URN with params.

2017-02-27 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master c8dfb851c -> 16736a6b6


Make access_pattern a URN with params.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a0dcd3f9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a0dcd3f9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a0dcd3f9

Branch: refs/heads/master
Commit: a0dcd3f97f48cdca02bae4c371e4e143d00fdba1
Parents: c8dfb85
Author: Robert Bradshaw 
Authored: Fri Feb 24 15:53:39 2017 -0800
Committer: Robert Bradshaw 
Committed: Mon Feb 27 17:46:00 2017 -0800

--
 sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/a0dcd3f9/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 989e4bb..ad6d0cb 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -591,7 +591,7 @@ message SideInput {
   // The only access pattern intended for Beam, because of its superior
   // performance possibilities, is "urn:beam:sideinput:multimap" (or some such
   // URN)
-  string access_pattern = 3;
+  UrnWithParameter access_pattern = 3;
 
   // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
   // adapts a particular access_pattern to a user-facing view type.



[3/3] beam git commit: Closes #2106

2017-02-27 Thread robertwb
Closes #2106


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16736a6b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16736a6b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16736a6b

Branch: refs/heads/master
Commit: 16736a6b612ba38b19694ae2a23dea7f5fab1563
Parents: c8dfb85 c04e8ab
Author: Robert Bradshaw 
Authored: Mon Feb 27 17:46:02 2017 -0800
Committer: Robert Bradshaw 
Committed: Mon Feb 27 17:46:02 2017 -0800

--
 sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 8 +++-
 1 file changed, 3 insertions(+), 5 deletions(-)
--




[2/3] beam git commit: Clarify side input name and remove redundant pcollection reference.

2017-02-27 Thread robertwb
Clarify side input name and remove redundant pcollection reference.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c04e8abd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c04e8abd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c04e8abd

Branch: refs/heads/master
Commit: c04e8abd41575a4e819c3f18dabff921338f55d8
Parents: a0dcd3f
Author: Robert Bradshaw 
Authored: Fri Feb 24 16:31:05 2017 -0800
Committer: Robert Bradshaw 
Committed: Mon Feb 27 17:46:01 2017 -0800

--
 sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 6 ++
 1 file changed, 2 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/c04e8abd/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index ad6d0cb..258c278 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -576,12 +576,10 @@ message TimestampTransform {
 // A specification for how to "side input" a PCollection.
 message SideInput {
 
-  // (Required) A local name for this side input, as embedded in a serialized 
UDF.
+  // (Required) A local name for this side input, as interpreted by its
+  // parent PTransform (and/or its PTransform's UDFs).
   string name = 1;
 
-  // (Required) The pipeline-scoped unique id of the PCollection to be side 
input.
-  string pcollection_id = 2;
-
   // (Required) URN of the access pattern required by the `view_fn` to present
   // the desired SDK-specific interface to a UDF.
   //



[GitHub] beam pull request #2106: [BEAM-115] More Runner API refinements.

2017-02-24 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/2106

[BEAM-115] More Runner API refinements.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam runner-protos

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2106.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2106


commit c2f5351e3731f29d024240016c553cecd3be3143
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-02-24T23:53:39Z

Make access_pattern a URN with params.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: Attach original traceback in direct runner failure.

2017-02-10 Thread robertwb
Attach original traceback in direct runner failure.

This will make it much easier to debug pipelines.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5fba260d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5fba260d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5fba260d

Branch: refs/heads/master
Commit: 5fba260d89040b4325c872c2764a3981ad189df8
Parents: effca63
Author: Robert Bradshaw 
Authored: Thu Feb 9 17:28:32 2017 -0800
Committer: Robert Bradshaw 
Committed: Fri Feb 10 13:22:02 2017 -0800

--
 .../apache_beam/runners/direct/executor.py  | 24 ++--
 1 file changed, 17 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/5fba260d/sdks/python/apache_beam/runners/direct/executor.py
--
diff --git a/sdks/python/apache_beam/runners/direct/executor.py 
b/sdks/python/apache_beam/runners/direct/executor.py
index 2d4a8bd..ff0d372 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -22,6 +22,7 @@ from __future__ import absolute_import
 import collections
 import logging
 import Queue
+import sys
 import threading
 import traceback
 from weakref import WeakValueDictionary
@@ -364,7 +365,8 @@ class _ExecutorServiceParallelExecutor(object):
 update = self.visible_updates.take()
 try:
   if update.exception:
-raise update.exception
+t, v, tb = update.exc_info
+raise t, v, tb
 finally:
   self.executor_service.shutdown()
 
@@ -426,6 +428,10 @@ class _ExecutorServiceParallelExecutor(object):
   assert bool(produced_bundle) != bool(exception)
   self.committed_bundle = produced_bundle
   self.exception = exception
+  self.exc_info = sys.exc_info()
+  if self.exc_info[1] is not exception:
+# Not the right exception.
+self.exc_info = (exception, None, None)
 
   class VisibleExecutorUpdate(object):
 """An update of interest to the user.
@@ -434,9 +440,10 @@ class _ExecutorServiceParallelExecutor(object):
 raise an exception.
 """
 
-def __init__(self, exception=None):
-  self.finished = exception is not None
-  self.exception = exception
+def __init__(self, exc_info=(None, None, None)):
+  self.finished = exc_info[0] is not None
+  self.exception = exc_info[1] or exc_info[0]
+  self.exc_info = exc_info
 
   class _MonitorTask(ExecutorService.CallableTask):
 """MonitorTask continuously runs to ensure that pipeline makes progress."""
@@ -460,7 +467,7 @@ class _ExecutorServiceParallelExecutor(object):
 update.exception)
 self._executor.visible_updates.offer(
 _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
-update.exception))
+update.exc_info))
   update = self._executor.all_updates.poll()
 self._executor.evaluation_context.schedule_pending_unblocked_tasks(
 self._executor.executor_service)
@@ -468,7 +475,8 @@ class _ExecutorServiceParallelExecutor(object):
   except Exception as e:  # pylint: disable=broad-except
 logging.error('Monitor task died due to exception.\n %s', e)
 self._executor.visible_updates.offer(
-_ExecutorServiceParallelExecutor.VisibleExecutorUpdate(e))
+_ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+sys.exc_info()))
   finally:
 if not self._should_shutdown():
   self._executor.executor_service.submit(self)
@@ -502,7 +510,9 @@ class _ExecutorServiceParallelExecutor(object):
   # Nothing is scheduled for execution, but watermarks incomplete.
   self._executor.visible_updates.offer(
   _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
-  Exception('Monitor task detected a pipeline stall.')))
+  (Exception('Monitor task detected a pipeline stall.'),
+   None,
+   None)))
 self._executor.executor_service.shutdown()
 return True
 



[1/2] beam git commit: Closes #1970

2017-02-10 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master effca63f9 -> 51ef0009a


Closes #1970


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51ef0009
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51ef0009
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51ef0009

Branch: refs/heads/master
Commit: 51ef0009afd03b5a650836a8ae2cffab3c832476
Parents: effca63 5fba260
Author: Robert Bradshaw 
Authored: Fri Feb 10 13:22:02 2017 -0800
Committer: Robert Bradshaw 
Committed: Fri Feb 10 13:22:02 2017 -0800

--
 .../apache_beam/runners/direct/executor.py  | 24 ++--
 1 file changed, 17 insertions(+), 7 deletions(-)
--




[2/2] beam git commit: Closes #1959

2017-02-10 Thread robertwb
Closes #1959


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caaa64dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caaa64dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caaa64dc

Branch: refs/heads/master
Commit: caaa64dc6c3f92be7a53349e9d58a5630db855a5
Parents: 9c4a784 7126467
Author: Robert Bradshaw 
Authored: Fri Feb 10 09:47:48 2017 -0800
Committer: Robert Bradshaw 
Committed: Fri Feb 10 09:47:48 2017 -0800

--
 sdks/python/apache_beam/runners/common.py | 13 +
 sdks/python/apache_beam/transforms/window_test.py | 16 
 2 files changed, 25 insertions(+), 4 deletions(-)
--




[1/2] beam git commit: BEAM-1443: Preserve window duplication on window assignment.

2017-02-10 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 9c4a784bb -> caaa64dc6


BEAM-1443: Preserve window duplication on window assignment.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71264671
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71264671
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71264671

Branch: refs/heads/master
Commit: 712646714b8efec6b024ebd2ba0f0e066280f36e
Parents: 9c4a784
Author: Robert Bradshaw 
Authored: Wed Feb 8 17:58:32 2017 -0800
Committer: Robert Bradshaw 
Committed: Fri Feb 10 09:47:47 2017 -0800

--
 sdks/python/apache_beam/runners/common.py | 13 +
 sdks/python/apache_beam/transforms/window_test.py | 16 
 2 files changed, 25 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/71264671/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 6f86ca0..2c1032d 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -224,7 +224,7 @@ class DoFnRunner(Receiver):
 # Call for the process function for each window if has windowed side inputs
 # or if the process accesses the window parameter. We can just call it once
 # otherwise as none of the arguments are changing
-if self.has_windowed_inputs and len(element.windows) > 1:
+if self.has_windowed_inputs and len(element.windows) != 1:
   for w in element.windows:
 self._dofn_per_window_invoker(
 WindowedValue(element.value, element.timestamp, (w,)))
@@ -280,7 +280,7 @@ class DoFnRunner(Receiver):
 else:
   raise
 
-  def _process_outputs(self, element, results):
+  def _process_outputs(self, windowed_input_element, results):
 """Dispatch the result of computation to the appropriate receivers.
 
 A value wrapped in a SideOutputValue object will be unwrapped and
@@ -297,7 +297,10 @@ class DoFnRunner(Receiver):
 result = result.value
   if isinstance(result, WindowedValue):
 windowed_value = result
-  elif element is None:
+if (windowed_input_element is not None
+and len(windowed_input_element.windows) != 1):
+  windowed_value.windows *= len(windowed_input_element.windows)
+  elif windowed_input_element is None:
 # Start and finish have no element from which to grab context,
 # but may emit elements.
 if isinstance(result, TimestampedValue):
@@ -315,8 +318,10 @@ class DoFnRunner(Receiver):
 windowed_value = WindowedValue(
 result.value, result.timestamp,
 self.window_fn.assign(assign_context))
+if len(windowed_input_element.windows) != 1:
+  windowed_value.windows *= len(windowed_input_element.windows)
   else:
-windowed_value = element.with_value(result)
+windowed_value = windowed_input_element.with_value(result)
   if tag is None:
 self.main_receivers.receive(windowed_value)
   else:

http://git-wip-us.apache.org/repos/asf/beam/blob/71264671/sdks/python/apache_beam/transforms/window_test.py
--
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index 1a21709..c4072ac 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -176,6 +176,22 @@ class WindowTest(unittest.TestCase):
   ('key', [5, 6, 7, 8, 9])]))
 p.run()
 
+  def test_rewindow(self):
+p = TestPipeline()
+result = (p
+  | Create([(k, k) for k in range(10)])
+  | Map(lambda (x, t): TimestampedValue(x, t))
+  | 'window' >> WindowInto(SlidingWindows(period=2, size=6))
+  # Per the model, each element is now duplicated across
+  # three windows. Rewindowing must preserve this duplication.
+  | 'rewindow' >> WindowInto(FixedWindows(5))
+  | 'rewindow2' >> WindowInto(FixedWindows(5))
+  | Map(lambda v: ('key', v))
+  | GroupByKey())
+assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
+  ('key', sorted([5, 6, 7, 8, 9] * 3))]))
+p.run()
+
   def test_timestamped_with_combiners(self):
 p = TestPipeline()
 result = (p



[2/2] beam git commit: Closes #1969

2017-02-09 Thread robertwb
Closes #1969


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ab60d89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ab60d89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ab60d89

Branch: refs/heads/master
Commit: 6ab60d890d41518e0d6c6214bd03b5191157c7af
Parents: 70a702e 1677e10
Author: Robert Bradshaw 
Authored: Thu Feb 9 17:34:21 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Feb 9 17:34:21 2017 -0800

--
 sdks/python/apache_beam/io/textio.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[GitHub] beam pull request #1970: Attach original exception in direct runner failure.

2017-02-09 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/1970

Attach original exception in direct runner failure.

This will make it much easier to debug pipelines.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam runner-exceptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1970.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1970


commit 40fb5f01fe7327ff9b4de3d98166363b234873a0
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-02-10T01:28:32Z

Attach original exception in direct runner failure.

This will make it much easier to debug pipelines.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: [BEAM-1450] Fix NewDoFn handling of window explosion.

2017-02-09 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master c26d5827b -> cd6802bec


[BEAM-1450] Fix NewDoFn handling of window explosion.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37b6fb1a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37b6fb1a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37b6fb1a

Branch: refs/heads/master
Commit: 37b6fb1a1c65ff53cef242321eb1ef4ddf48e022
Parents: c26d582
Author: Robert Bradshaw 
Authored: Thu Feb 9 12:18:48 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Feb 9 15:06:28 2017 -0800

--
 sdks/python/apache_beam/pipeline_test.py   |  7 +++
 sdks/python/apache_beam/runners/common.pxd |  3 +--
 sdks/python/apache_beam/runners/common.py  | 17 +++--
 3 files changed, 19 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/pipeline_test.py
--
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 90b1a54..2f188aa 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -327,6 +327,13 @@ class DoFnTest(unittest.TestCase):
  | ParDo(TestDoFn()))
 assert_that(pcoll, equal_to([(1, (-5, 5)), (1, (0, 10)),
  (7, (0, 10)), (7, (5, 15))]))
+pcoll2 = pcoll | 'Again' >> ParDo(TestDoFn())
+assert_that(
+pcoll2,
+equal_to([
+((1, (-5, 5)), (-5, 5)), ((1, (0, 10)), (0, 10)),
+((7, (0, 10)), (0, 10)), ((7, (5, 15)), (5, 15))]),
+label='doubled windows')
 pipeline.run()
 
   def test_timestamp_param(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/runners/common.pxd
--
diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index f36fdd0..781d96b 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -50,8 +50,7 @@ cdef class DoFnRunner(Receiver):
   cpdef process(self, WindowedValue element)
   cdef _dofn_invoker(self, WindowedValue element)
   cdef _dofn_simple_invoker(self, WindowedValue element)
-  cdef _dofn_window_invoker(
-  self, WindowedValue element, list args, dict kwargs, object window)
+  cdef _dofn_per_window_invoker(self, WindowedValue element)
 
   @cython.locals(windowed_value=WindowedValue)
   cpdef _process_outputs(self, WindowedValue element, results)

http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 50ccf22..6f86ca0 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -197,7 +197,13 @@ class DoFnRunner(Receiver):
   def _dofn_simple_invoker(self, element):
 self._process_outputs(element, self.dofn_process(element.value))
 
-  def _dofn_window_invoker(self, element, args, kwargs, window):
+  def _dofn_per_window_invoker(self, element):
+if self.has_windowed_inputs:
+  window, = element.windows
+  args, kwargs = util.insert_values_in_args(
+  self.args, self.kwargs, [si[window] for si in self.side_inputs])
+else:
+  args, kwargs = self.args, self.kwargs
 # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
 for i, p in self.placeholders:
   if p == core.DoFn.ElementParam:
@@ -218,13 +224,12 @@ class DoFnRunner(Receiver):
 # Call for the process function for each window if has windowed side inputs
 # or if the process accesses the window parameter. We can just call it once
 # otherwise as none of the arguments are changing
-if self.has_windowed_inputs:
+if self.has_windowed_inputs and len(element.windows) > 1:
   for w in element.windows:
-args, kwargs = util.insert_values_in_args(
-self.args, self.kwargs, [si[w] for si in self.side_inputs])
-self._dofn_window_invoker(element, args, kwargs, w)
+self._dofn_per_window_invoker(
+WindowedValue(element.value, element.timestamp, (w,)))
 else:
-  self._dofn_window_invoker(element, self.args, self.kwargs, None)
+  self._dofn_per_window_invoker(element)
 
   def _invoke_bundle_method(self, method):
 try:



[GitHub] beam pull request #1965: [BEAM-1450] Fix NewDoFn handling of window explosio...

2017-02-09 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/1965

[BEAM-1450] Fix NewDoFn handling of window explosion.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam newdofn-windows

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1965.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1965


commit a7b5fe559e3796c2b31aa55ddb8ff2b4566f6a00
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-02-09T20:18:48Z

[BEAM-1450] Fix NewDoFn handling of window explosion.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: Closes #1845

2017-02-06 Thread robertwb
Closes #1845


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f90558c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f90558c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f90558c3

Branch: refs/heads/master
Commit: f90558c34ec90c1f43b119b160971deddc8b2c61
Parents: a26fd1f 98e513b
Author: Robert Bradshaw 
Authored: Mon Feb 6 20:28:04 2017 -0800
Committer: Robert Bradshaw 
Committed: Mon Feb 6 20:28:04 2017 -0800

--
 .../apache_beam/examples/snippets/snippets.py   | 20 --
 .../examples/snippets/snippets_test.py  | 29 
 2 files changed, 40 insertions(+), 9 deletions(-)
--




[1/2] beam git commit: Add snippet for reading from compressed text sources

2017-02-06 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master a26fd1ff3 -> f90558c34


Add snippet for reading from compressed text sources


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98e513bc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98e513bc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98e513bc

Branch: refs/heads/master
Commit: 98e513bc64be3241d8a97902cba4808eec384a4b
Parents: a26fd1f
Author: Sourabh Bajaj 
Authored: Mon Feb 6 17:23:14 2017 -0800
Committer: Robert Bradshaw 
Committed: Mon Feb 6 20:28:03 2017 -0800

--
 .../apache_beam/examples/snippets/snippets.py   | 20 --
 .../examples/snippets/snippets_test.py  | 29 
 2 files changed, 40 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/98e513bc/sdks/python/apache_beam/examples/snippets/snippets.py
--
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 42d194e..c3879dc 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -828,8 +828,7 @@ def model_textio(renames):
   # [START model_textio_read]
   p = beam.Pipeline(options=PipelineOptions())
   # [START model_pipelineio_read]
-  lines = p | 'ReadFromText' >> beam.io.ReadFromText(
-  'gs://my_bucket/path/to/input-*.csv')
+  lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')
   # [END model_pipelineio_read]
   # [END model_textio_read]
 
@@ -837,7 +836,7 @@ def model_textio(renames):
   filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
   # [START model_pipelineio_write]
   filtered_words | 'WriteToText' >> beam.io.WriteToText(
-  'gs://my_bucket/path/to/numbers', file_name_suffix='.csv')
+  '/path/to/numbers', file_name_suffix='.csv')
   # [END model_pipelineio_write]
   # [END model_textio_write]
 
@@ -845,6 +844,21 @@ def model_textio(renames):
   p.run().wait_until_finish()
 
 
+def model_textio_compressed(renames, expected):
+  """Using a Read Transform to read compressed text files."""
+  p = TestPipeline()
+
+  # [START model_textio_write_compressed]
+  lines = p | 'ReadFromText' >> beam.io.ReadFromText(
+  '/path/to/input-*.csv.gz',
+  compression_type=beam.io.fileio.CompressionTypes.GZIP)
+  # [END model_textio_write_compressed]
+
+  beam.assert_that(lines, beam.equal_to(expected))
+  p.visit(SnippetUtils.RenameFiles(renames))
+  p.run().wait_until_finish()
+
+
 def model_datastoreio():
   """Using a Read and Write transform to read/write to Cloud Datastore."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/98e513bc/sdks/python/apache_beam/examples/snippets/snippets_test.py
--
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index a602b66..4827e94 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -18,6 +18,7 @@
 """Tests for all code snippets used in public docs."""
 
 import glob
+import gzip
 import logging
 import os
 import tempfile
@@ -355,13 +356,15 @@ class SnippetsTest(unittest.TestCase):
 To be used for testing.
 """
 
-def __init__(self, file_to_read=None):
+def __init__(self, file_to_read=None, compression_type=None):
   self.file_to_read = file_to_read
+  self.compression_type = compression_type
 
 class ReadDoFn(beam.DoFn):
 
-  def __init__(self, file_to_read):
+  def __init__(self, file_to_read, compression_type):
 self.file_to_read = file_to_read
+self.compression_type = compression_type
 self.coder = coders.StrUtf8Coder()
 
   def process(self, element):
@@ -370,13 +373,19 @@ class SnippetsTest(unittest.TestCase):
   def finish_bundle(self):
 assert self.file_to_read
 for file_name in glob.glob(self.file_to_read):
-  with open(file_name) as file:
-for record in file:
-  yield self.coder.decode(record.rstrip('\n'))
+  if self.compression_type is None:
+with open(file_name) as file:
+  for record in file:
+yield self.coder.decode(record.rstrip('\n'))
+  else:
+with gzip.open(file_name, 'r') as file:
+  for record in file:
+yield self.coder.decode(record.rstrip('\n'))
 
 def expand(self, pcoll):
   return pcoll | beam.Create([None]) | 'DummyReadForTesting' >> beam.ParDo(
-  

[1/2] beam git commit: Reduce cost of sample combine test

2017-02-01 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 2689ca43c -> 420a71860


Reduce cost of sample combine test

There's no need to run this test 300 times, which takes tens of seconds.

Also split global and per-key sampling into separate tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1dc391d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1dc391d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1dc391d

Branch: refs/heads/master
Commit: d1dc391dd17345368a03187591a99ddf3d18282f
Parents: 2689ca4
Author: Robert Bradshaw 
Authored: Tue Jan 31 12:51:27 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Feb 1 10:56:53 2017 -0800

--
 .../apache_beam/transforms/combiners_test.py| 32 
 1 file changed, 12 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/d1dc391d/sdks/python/apache_beam/transforms/combiners_test.py
--
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index 8a6d352..ba8ae82 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -218,29 +218,21 @@ class CombineTest(unittest.TestCase):
 assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot')
 pipeline.run()
 
-  def test_sample(self):
+  def test_global_sample(self):
 
-# First test global samples (lots of them).
-for ix in xrange(300):
-  pipeline = TestPipeline()
+def is_good_sample(actual):
+  assert len(actual) == 1
+  assert sorted(actual[0]) in [[1, 1, 2], [1, 2, 2]], actual
+
+with TestPipeline() as pipeline:
   pcoll = pipeline | 'start' >> Create([1, 1, 2, 2])
-  result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
-
-  def matcher():
-def match(actual):
-  # There is always exactly one result.
-  equal_to([1])([len(actual)])
-  # There are always exactly three samples in the result.
-  equal_to([3])([len(actual[0])])
-  # Sampling is without replacement.
-  num_ones = sum(1 for x in actual[0] if x == 1)
-  num_twos = sum(1 for x in actual[0] if x == 2)
-  equal_to([1, 2])([num_ones, num_twos])
-return match
-  assert_that(result, matcher())
-  pipeline.run()
+  for ix in xrange(30):
+assert_that(
+pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3),
+is_good_sample,
+label='check-%d' % ix)
 
-# Now test per-key samples.
+  def test_per_key_sample(self):
 pipeline = TestPipeline()
 pcoll = pipeline | 'start-perkey' >> Create(
 sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))



[1/3] beam git commit: Implement combiner lifting for direct runner.

2017-01-31 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master b80aac5e3 -> e9cd41165


Implement combiner lifting for direct runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0de5cf87
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0de5cf87
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0de5cf87

Branch: refs/heads/master
Commit: 0de5cf875aaef9e987561371a0fa56c875ce45c1
Parents: b80aac5
Author: Robert Bradshaw 
Authored: Tue Jan 31 11:41:09 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Jan 31 15:35:21 2017 -0800

--
 .../apache_beam/runners/direct/direct_runner.py | 15 +++-
 .../runners/direct/helper_transforms.py | 77 
 2 files changed, 88 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/direct_runner.py
--
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index dc2668d..28dc012 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -40,6 +40,17 @@ class DirectRunner(PipelineRunner):
   def __init__(self):
 self._cache = None
 
+  def apply_CombinePerKey(self, transform, pcoll):
+# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
+# with resolving imports when they are at top.
+# pylint: disable=wrong-import-position
+from apache_beam.runners.direct.helper_transforms import 
LiftedCombinePerKey
+try:
+  return pcoll | LiftedCombinePerKey(
+transform.fn, transform.args, transform.kwargs)
+except NotImplementedError:
+  return transform.expand(pcoll)
+
   def run(self, pipeline):
 """Execute the entire pipeline and returns an DirectPipelineResult."""
 
@@ -90,10 +101,6 @@ class DirectRunner(PipelineRunner):
   self._cache = BufferingInMemoryCache()
 return self._cache.pvalue_cache
 
-  def apply(self, transform, input):  # pylint: disable=redefined-builtin
-"""Runner callback for a pipeline.apply call."""
-return transform.expand(input)
-
 
 class BufferingInMemoryCache(object):
   """PValueCache wrapper for buffering bundles until a PValue is fully 
computed.

http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/helper_transforms.py
--
diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py 
b/sdks/python/apache_beam/runners/direct/helper_transforms.py
new file mode 100644
index 000..340db75
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import collections
+import itertools
+
+import apache_beam as beam
+from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.internal.util import ArgumentPlaceholder
+
+
+class LiftedCombinePerKey(beam.PTransform):
+  """An implementation of CombinePerKey that does mapper-side pre-combining.
+  """
+  def __init__(self, combine_fn, args, kwargs):
+if any(isinstance(arg, ArgumentPlaceholder)
+   for arg in itertools.chain(args, kwargs.values())):
+  # This isn't implemented in dataflow either...
+  raise NotImplementedError('Deferred CombineFn side inputs.')
+self._combine_fn = beam.transforms.combiners.curry_combine_fn(
+combine_fn, args, kwargs)
+
+  def expand(self, pcoll):
+return (pcoll
+  | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
+  | beam.GroupByKey()
+  | beam.ParDo(FinishCombine(self._combine_fn)))
+
+
+class PartialGroupByKeyCombiningValues(beam.DoFn):
+  """Aggregates values into a per-key-window cache.
+
+  As bundles are in-memory-sized, we don't bother flushing until the very end.
+  """
+  def __init__(self, combine_fn):
+ 

[GitHub] beam pull request #1882: Combiner lifting

2017-01-31 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam/pull/1882

Combiner lifting

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam combiner-lifting

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1882.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1882


commit d25b14fc74f24bc88debbf3fd42a3bc775249462
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-01-31T19:41:09Z

Implement combiner lifting for direct runner.

commit 07e4154d4afbd0cbdda4419a7c18e6e338cef869
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-01-31T20:41:22Z

Fix typehints tests.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam-site pull request #131: Clarify maturity of various runners.

2017-01-27 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/beam-site/pull/131

Clarify maturity of various runners.

R: @francesperry 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam-site patch-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam-site/pull/131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #131


commit 1835c0efdc17177a7fe6dd9e8d554fe89c4ba731
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2017-01-27T20:14:03Z

Clarify maturity of various runners.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/3] beam git commit: Updating dataflow client protos to add new metrics.

2017-01-27 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/python-sdk 3d6f20d67 -> 52d97e2fc


http://git-wip-us.apache.org/repos/asf/beam/blob/901a14c4/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py
--
diff --git 
a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py 
b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py
index 178a542..a42154e 100644
--- 
a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py
+++ 
b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py
@@ -24,6 +24,7 @@ and continuous computation.
 
 from apitools.base.protorpclite import messages as _messages
 from apitools.base.py import encoding
+from apitools.base.py import extra_types
 
 
 package = 'dataflow'
@@ -193,6 +194,7 @@ class CounterMetadata(_messages.Message):
   AND: Aggregated value represents the logical 'and' of all contributed
 values.
   SET: Aggregated value is a set of unique contributed values.
+  DISTRIBUTION: Aggregated value captures statistics about a distribution.
 """
 INVALID = 0
 SUM = 1
@@ -202,6 +204,7 @@ class CounterMetadata(_messages.Message):
 OR = 5
 AND = 6
 SET = 7
+DISTRIBUTION = 8
 
   class StandardUnitsValueValuesEnum(_messages.Enum):
 """System defined Units, see above enum.
@@ -308,6 +311,7 @@ class CounterUpdate(_messages.Message):
   aggregate value accumulated since the worker started working on this
   WorkItem. By default this is false, indicating that this counter is
   reported as a delta.
+distribution: Distribution data
 floatingPoint: Floating point value for Sum, Max, Min.
 floatingPointList: List of floating point numbers, for Set.
 floatingPointMean: Floating point mean aggregation value for Mean.
@@ -326,34 +330,38 @@ class CounterUpdate(_messages.Message):
 
   boolean = _messages.BooleanField(1)
   cumulative = _messages.BooleanField(2)
-  floatingPoint = _messages.FloatField(3)
-  floatingPointList = _messages.MessageField('FloatingPointList', 4)
-  floatingPointMean = _messages.MessageField('FloatingPointMean', 5)
-  integer = _messages.MessageField('SplitInt64', 6)
-  integerList = _messages.MessageField('IntegerList', 7)
-  integerMean = _messages.MessageField('IntegerMean', 8)
-  internal = _messages.MessageField('extra_types.JsonValue', 9)
-  nameAndKind = _messages.MessageField('NameAndKind', 10)
-  shortId = _messages.IntegerField(11)
-  stringList = _messages.MessageField('StringList', 12)
-  structuredNameAndMetadata = 
_messages.MessageField('CounterStructuredNameAndMetadata', 13)
+  distribution = _messages.MessageField('DistributionUpdate', 3)
+  floatingPoint = _messages.FloatField(4)
+  floatingPointList = _messages.MessageField('FloatingPointList', 5)
+  floatingPointMean = _messages.MessageField('FloatingPointMean', 6)
+  integer = _messages.MessageField('SplitInt64', 7)
+  integerList = _messages.MessageField('IntegerList', 8)
+  integerMean = _messages.MessageField('IntegerMean', 9)
+  internal = _messages.MessageField('extra_types.JsonValue', 10)
+  nameAndKind = _messages.MessageField('NameAndKind', 11)
+  shortId = _messages.IntegerField(12)
+  stringList = _messages.MessageField('StringList', 13)
+  structuredNameAndMetadata = 
_messages.MessageField('CounterStructuredNameAndMetadata', 14)
 
 
 class CreateJobFromTemplateRequest(_messages.Message):
-  """Request to create a Dataflow job.
+  """A request to create a Cloud Dataflow job from a template.
 
   Messages:
-ParametersValue: Dynamic parameterization of the job's runtime
-  environment.
+ParametersValue: The runtime parameters to pass to the job.
 
   Fields:
-gcsPath: A path to the serialized JSON representation of the job.
-parameters: Dynamic parameterization of the job's runtime environment.
+environment: The runtime environment for the job.
+gcsPath: Required. A Cloud Storage path to the template from which to
+  create the job. Must be a valid Cloud Storage URL, beginning with
+  `gs://`.
+jobName: Required. The job name to use for the created job.
+parameters: The runtime parameters to pass to the job.
   """
 
   @encoding.MapUnrecognizedFields('additionalProperties')
   class ParametersValue(_messages.Message):
-"""Dynamic parameterization of the job's runtime environment.
+"""The runtime parameters to pass to the job.
 
 Messages:
   AdditionalProperty: An additional property for a ParametersValue object.
@@ -375,8 +383,10 @@ class CreateJobFromTemplateRequest(_messages.Message):
 
 additionalProperties = _messages.MessageField('AdditionalProperty', 1, 
repeated=True)
 
-  gcsPath = _messages.StringField(1)
-  parameters = _messages.MessageField('ParametersValue', 2)
+  environment = _messages.MessageField('RuntimeEnvironment', 1)
+  gcsPath = _messages.StringField(2)

[3/3] beam git commit: Closes #1857

2017-01-27 Thread robertwb
Closes #1857


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52d97e2f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52d97e2f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52d97e2f

Branch: refs/heads/python-sdk
Commit: 52d97e2fc2e383a58969447addd45ebe3eed4f5f
Parents: 3d6f20d 901a14c
Author: Robert Bradshaw 
Authored: Fri Jan 27 12:00:25 2017 -0800
Committer: Robert Bradshaw 
Committed: Fri Jan 27 12:00:25 2017 -0800

--
 .../clients/dataflow/dataflow_v1b3_client.py| 578 
 .../clients/dataflow/dataflow_v1b3_messages.py  | 931 +--
 2 files changed, 1075 insertions(+), 434 deletions(-)
--




[2/3] beam git commit: Updating dataflow client protos to add new metrics.

2017-01-27 Thread robertwb
Updating dataflow client protos to add new metrics.

In order to use counter structured names, and add new metrics (e.g.
Distributions), we need to update the dataflow client protocol buffers.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/901a14c4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/901a14c4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/901a14c4

Branch: refs/heads/python-sdk
Commit: 901a14c47bd21fadd563de1017f7c9d2b38cf4f1
Parents: 3d6f20d
Author: Pablo 
Authored: Thu Jan 26 18:02:04 2017 -0800
Committer: Pablo 
Committed: Fri Jan 27 11:46:26 2017 -0800

--
 .../clients/dataflow/dataflow_v1b3_client.py| 578 
 .../clients/dataflow/dataflow_v1b3_messages.py  | 931 +--
 2 files changed, 1075 insertions(+), 434 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/901a14c4/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py
--
diff --git 
a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py 
b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py
index 840b887..6ae2b73 100644
--- a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py
+++ b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py
@@ -55,6 +55,10 @@ class DataflowV1b3(base_api.BaseApiClient):
 self.projects_jobs_messages = self.ProjectsJobsMessagesService(self)
 self.projects_jobs_workItems = self.ProjectsJobsWorkItemsService(self)
 self.projects_jobs = self.ProjectsJobsService(self)
+self.projects_locations_jobs_messages = 
self.ProjectsLocationsJobsMessagesService(self)
+self.projects_locations_jobs_workItems = 
self.ProjectsLocationsJobsWorkItemsService(self)
+self.projects_locations_jobs = self.ProjectsLocationsJobsService(self)
+self.projects_locations = self.ProjectsLocationsService(self)
 self.projects_templates = self.ProjectsTemplatesService(self)
 self.projects = self.ProjectsService(self)
 
@@ -65,33 +69,6 @@ class DataflowV1b3(base_api.BaseApiClient):
 
 def __init__(self, client):
   super(DataflowV1b3.ProjectsJobsDebugService, self).__init__(client)
-  self._method_configs = {
-  'GetConfig': base_api.ApiMethodInfo(
-  http_method=u'POST',
-  method_id=u'dataflow.projects.jobs.debug.getConfig',
-  ordered_params=[u'projectId', u'jobId'],
-  path_params=[u'jobId', u'projectId'],
-  query_params=[],
-  
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/getConfig',
-  request_field=u'getDebugConfigRequest',
-  request_type_name=u'DataflowProjectsJobsDebugGetConfigRequest',
-  response_type_name=u'GetDebugConfigResponse',
-  supports_download=False,
-  ),
-  'SendCapture': base_api.ApiMethodInfo(
-  http_method=u'POST',
-  method_id=u'dataflow.projects.jobs.debug.sendCapture',
-  ordered_params=[u'projectId', u'jobId'],
-  path_params=[u'jobId', u'projectId'],
-  query_params=[],
-  
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/sendCapture',
-  request_field=u'sendDebugCaptureRequest',
-  request_type_name=u'DataflowProjectsJobsDebugSendCaptureRequest',
-  response_type_name=u'SendDebugCaptureResponse',
-  supports_download=False,
-  ),
-  }
-
   self._upload_configs = {
   }
 
@@ -108,6 +85,19 @@ class DataflowV1b3(base_api.BaseApiClient):
   return self._RunMethod(
   config, request, global_params=global_params)
 
+GetConfig.method_config = lambda: base_api.ApiMethodInfo(
+http_method=u'POST',
+method_id=u'dataflow.projects.jobs.debug.getConfig',
+ordered_params=[u'projectId', u'jobId'],
+path_params=[u'jobId', u'projectId'],
+query_params=[],
+
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/getConfig',
+request_field=u'getDebugConfigRequest',
+request_type_name=u'DataflowProjectsJobsDebugGetConfigRequest',
+response_type_name=u'GetDebugConfigResponse',
+supports_download=False,
+)
+
 def SendCapture(self, request, global_params=None):
   """Send encoded debug capture data for component.
 
@@ -121,6 +111,19 @@ class DataflowV1b3(base_api.BaseApiClient):
   return self._RunMethod(
   config, request, global_params=global_params)
 
+SendCapture.method_config = lambda: base_api.ApiMethodInfo(
+http_method=u'POST',
+

[1/2] beam git commit: Refactoring metrics infrastructure

2017-01-26 Thread robertwb
ainer_stack()
+self.PER_THREAD.container.pop()
 
-  @classmethod
-  def set_current_container(cls, container):
-cls.PER_THREAD.container = container
 
-  @classmethod
-  def unset_current_container(cls):
-del cls.PER_THREAD.container
+MetricsEnvironment = _MetricsEnvironment()
 
 
 class MetricsContainer(object):
@@ -180,16 +193,21 @@ class MetricsContainer(object):
 
 
 class ScopedMetricsContainer(object):
-  def __init__(self, container):
-self._old_container = MetricsEnvironment.current_container()
+  def __init__(self, container=None):
+self._stack = MetricsEnvironment.container_stack()
 self._container = container
 
+  def enter(self):
+self._stack.append(self._container)
+
+  def exit(self):
+self._stack.pop()
+
   def __enter__(self):
-MetricsEnvironment.set_current_container(self._container)
-return self._container
+self.enter()
 
   def __exit__(self, type, value, traceback):
-MetricsEnvironment.set_current_container(self._old_container)
+self.exit()
 
 
 class MetricUpdates(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/common.pxd
--
diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index 10d1f96..f41b313 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -18,6 +18,7 @@
 cimport cython
 
 from apache_beam.utils.windowed_value cimport WindowedValue
+from apache_beam.metrics.execution cimport ScopedMetricsContainer
 
 
 cdef type SideOutputValue, TimestampedValue
@@ -40,6 +41,7 @@ cdef class DoFnRunner(Receiver):
   cdef object args
   cdef dict kwargs
   cdef object side_inputs
+  cdef ScopedMetricsContainer scoped_metrics_container
   cdef bint has_windowed_side_inputs
 
   cdef Receiver main_receivers

http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 3741582..cb47513 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -22,6 +22,7 @@
 import sys
 
 from apache_beam.internal import util
+from apache_beam.metrics.execution import ScopedMetricsContainer
 from apache_beam.pvalue import SideOutputValue
 from apache_beam.transforms import core
 from apache_beam.transforms import window
@@ -69,7 +70,8 @@ class DoFnRunner(Receiver):
logging_context=None,
    # Preferred alternative to context
# TODO(robertwb): Remove once all runners are updated.
-   state=None):
+   state=None,
+   scoped_metrics_container=None):
 """Initializes a DoFnRunner.
 
 Args:
@@ -84,10 +86,13 @@ class DoFnRunner(Receiver):
   step_name: the name of this step
   logging_context: a LoggingContext object
   state: handle for accessing DoFn state
+  scoped_metrics_container: Context switcher for metrics container
 """
 self.step_name = step_name
 self.window_fn = windowing.windowfn
 self.tagged_receivers = tagged_receivers
+self.scoped_metrics_container = (scoped_metrics_container
+ or ScopedMetricsContainer())
 
 global_window = window.GlobalWindow()
 
@@ -236,6 +241,7 @@ class DoFnRunner(Receiver):
   def _invoke_bundle_method(self, method):
 try:
   self.logging_context.enter()
+  self.scoped_metrics_container.enter()
   self.context.set_element(None)
   f = getattr(self.dofn, method)
 
@@ -251,6 +257,7 @@ class DoFnRunner(Receiver):
 except BaseException as exn:
   self.reraise_augmented(exn)
 finally:
+  self.scoped_metrics_container.exit()
   self.logging_context.exit()
 
   def start(self):
@@ -262,6 +269,7 @@ class DoFnRunner(Receiver):
   def process(self, element):
 try:
   self.logging_context.enter()
+  self.scoped_metrics_container.enter()
   if self.is_new_dofn:
 self.new_dofn_process(element)
   else:
@@ -269,6 +277,7 @@ class DoFnRunner(Receiver):
 except BaseException as exn:
   self.reraise_augmented(exn)
 finally:
+  self.scoped_metrics_container.exit()
   self.logging_context.exit()
 
   def reraise_augmented(self, exn):

http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/direct/executor.py
--
diff --git a/sdks/python/apache_beam/runners/direct/executor.py 
b/sdks/python/apache_beam/runners/direct/executor.py
index 7e404f8..2d4a8bd 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -27,7 +27,7 @@ impo

[2/2] beam git commit: Closes #1835

2017-01-26 Thread robertwb
Closes #1835


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d6f20d6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d6f20d6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d6f20d6

Branch: refs/heads/python-sdk
Commit: 3d6f20d677bd7397e6e2d099c829bf4c439f8d18
Parents: e3849af b148f5c
Author: Robert Bradshaw 
Authored: Thu Jan 26 15:28:50 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Jan 26 15:28:50 2017 -0800

--
 sdks/python/apache_beam/metrics/execution.pxd   | 31 +
 sdks/python/apache_beam/metrics/execution.py| 70 
 sdks/python/apache_beam/runners/common.pxd  |  2 +
 sdks/python/apache_beam/runners/common.py   | 11 ++-
 .../apache_beam/runners/direct/executor.py  | 12 ++--
 .../runners/direct/transform_evaluator.py   | 54 ---
 sdks/python/setup.py|  1 +
 7 files changed, 125 insertions(+), 56 deletions(-)
--




[1/2] beam git commit: Fix read/write display data

2017-01-26 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/python-sdk c6420df97 -> e3849af8c


Fix read/write display data


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4eda3c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4eda3c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4eda3c3

Branch: refs/heads/python-sdk
Commit: e4eda3c335b5767bdaf40b56b2dd5d67d7348f20
Parents: c6420df
Author: Pablo 
Authored: Fri Jan 13 11:25:36 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Jan 26 14:51:56 2017 -0800

--
 sdks/python/apache_beam/io/avroio_test.py |  6 
 sdks/python/apache_beam/io/fileio.py  | 10 ++-
 sdks/python/apache_beam/io/fileio_test.py |  2 --
 sdks/python/apache_beam/io/iobase.py  | 38 +-
 sdks/python/apache_beam/io/textio.py  | 25 +
 sdks/python/apache_beam/io/textio_test.py | 30 
 6 files changed, 47 insertions(+), 64 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/avroio_test.py
--
diff --git a/sdks/python/apache_beam/io/avroio_test.py 
b/sdks/python/apache_beam/io/avroio_test.py
index aed468d..d2fb1d1 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -196,9 +196,6 @@ class TestAvro(unittest.TestCase):
 'file_pattern',
 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d.end'),
 DisplayDataItemMatcher(
-'shards',
-0),
-DisplayDataItemMatcher(
 'codec',
 'null'),
 DisplayDataItemMatcher(
@@ -219,9 +216,6 @@ class TestAvro(unittest.TestCase):
 'file_pattern',
 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d'),
 DisplayDataItemMatcher(
-'shards',
-0),
-DisplayDataItemMatcher(
 'codec',
 'deflate'),
 DisplayDataItemMatcher(

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio.py
--
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 52f31c6..f67dca9 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -547,7 +547,8 @@ class FileSink(iobase.Sink):
 
   def display_data(self):
 return {'shards':
-DisplayDataItem(self.num_shards, label='Number of Shards'),
+DisplayDataItem(self.num_shards,
+label='Number of Shards').drop_if_default(0),
 'compression':
 DisplayDataItem(str(self.compression_type)),
 'file_pattern':
@@ -787,6 +788,13 @@ class TextFileSink(FileSink):
   '\'textio.WriteToText()\' instead of directly '
   'instantiating a TextFileSink object.')
 
+  def display_data(self):
+dd_parent = super(TextFileSink, self).display_data()
+dd_parent['append_newline'] = DisplayDataItem(
+self.append_trailing_newlines,
+label='Append Trailing New Lines')
+return dd_parent
+
   def write_encoded_record(self, file_handle, encoded_value):
 """Writes a single encoded record."""
 file_handle.write(encoded_value)

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio_test.py
--
diff --git a/sdks/python/apache_beam/io/fileio_test.py 
b/sdks/python/apache_beam/io/fileio_test.py
index ad77dc5..6c33f53 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -142,8 +142,6 @@ class TestFileSink(unittest.TestCase):
 dd = DisplayData.create_from(sink)
 expected_items = [
 DisplayDataItemMatcher(
-'shards', 0),
-DisplayDataItemMatcher(
 'compression', 'auto'),
 DisplayDataItemMatcher(
 'file_pattern',

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/iobase.py
--
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index 12af3b6..1266ed3 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -759,16 +759,15 @@ class WriteImpl(ptransform.PTransform):
   write_result_coll = (keyed_pcoll
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
-   | 'WriteBundles' >> core.Map(
-  

[2/2] beam git commit: Closes #1776

2017-01-26 Thread robertwb
Closes #1776


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e3849af8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e3849af8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e3849af8

Branch: refs/heads/python-sdk
Commit: e3849af8c8b0982de07f2c24417042be91474039
Parents: c6420df e4eda3c
Author: Robert Bradshaw 
Authored: Thu Jan 26 14:51:57 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Jan 26 14:51:57 2017 -0800

--
 sdks/python/apache_beam/io/avroio_test.py |  6 
 sdks/python/apache_beam/io/fileio.py  | 10 ++-
 sdks/python/apache_beam/io/fileio_test.py |  2 --
 sdks/python/apache_beam/io/iobase.py  | 38 +-
 sdks/python/apache_beam/io/textio.py  | 25 +
 sdks/python/apache_beam/io/textio_test.py | 30 
 6 files changed, 47 insertions(+), 64 deletions(-)
--




[GitHub] beam pull request #1837: [BEAM-1218 ] Remove dataflow_test.py

2017-01-25 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/beam/pull/1837


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: Closes #1837

2017-01-25 Thread robertwb
Closes #1837


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6420df9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6420df9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6420df9

Branch: refs/heads/python-sdk
Commit: c6420df9791eb6083fba1f74bd88e06ce8f6a61f
Parents: 4e1028b 2aa7d47
Author: Robert Bradshaw 
Authored: Wed Jan 25 16:18:10 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Jan 25 16:18:10 2017 -0800

--
 sdks/python/apache_beam/dataflow_test.py| 418 ---
 .../apache_beam/transforms/ptransform_test.py   |  67 +++
 .../apache_beam/transforms/sideinputs_test.py   | 208 -
 3 files changed, 274 insertions(+), 419 deletions(-)
--




[1/2] beam git commit: Closes #1844

2017-01-25 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/python-sdk 592422059 -> 4e1028b3d


Closes #1844


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e1028b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e1028b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e1028b3

Branch: refs/heads/python-sdk
Commit: 4e1028b3dfeaf02e51eb9f3b5d1a5e78c1cfcbb9
Parents: 5924220 5787e81
Author: Robert Bradshaw 
Authored: Wed Jan 25 16:16:52 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Jan 25 16:16:52 2017 -0800

--
 .../python/apache_beam/utils/dependency_test.py | 47 +++-
 1 file changed, 27 insertions(+), 20 deletions(-)
--




[2/2] beam git commit: Use a temp directory for requirements cache in test_with_requirements_file

2017-01-25 Thread robertwb
Use a temp directory for requirements cache in
test_with_requirements_file

The test fails if there are leftover files in the default folder for
requirements cache either from earlier tests, or from the previous
workspaces.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5787e817
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5787e817
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5787e817

Branch: refs/heads/python-sdk
Commit: 5787e817a7eda4859963d535df21f2fa00edf8af
Parents: 5924220
Author: Ahmet Altay 
Authored: Wed Jan 25 09:57:18 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Jan 25 16:16:52 2017 -0800

--
 .../python/apache_beam/utils/dependency_test.py | 47 +++-
 1 file changed, 27 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/5787e817/sdks/python/apache_beam/utils/dependency_test.py
--
diff --git a/sdks/python/apache_beam/utils/dependency_test.py 
b/sdks/python/apache_beam/utils/dependency_test.py
index a484d60..75a89e2 100644
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -106,27 +106,34 @@ class SetupTest(unittest.TestCase):
 dependency.stage_job_resources(options))
 
   def test_with_requirements_file(self):
-staging_dir = tempfile.mkdtemp()
-source_dir = tempfile.mkdtemp()
+try:
+  staging_dir = tempfile.mkdtemp()
+  requirements_cache_dir = tempfile.mkdtemp()
+  source_dir = tempfile.mkdtemp()
 
-options = PipelineOptions()
-options.view_as(GoogleCloudOptions).staging_location = staging_dir
-self.update_options(options)
-options.view_as(SetupOptions).requirements_file = os.path.join(
-source_dir, dependency.REQUIREMENTS_FILE)
-self.create_temp_file(
-os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
-self.assertEqual(
-sorted([dependency.REQUIREMENTS_FILE,
-'abc.txt', 'def.txt']),
-sorted(dependency.stage_job_resources(
-options,
-populate_requirements_cache=self.populate_requirements_cache)))
-self.assertTrue(
-os.path.isfile(
-os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
-self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
-self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
+  options = PipelineOptions()
+  options.view_as(GoogleCloudOptions).staging_location = staging_dir
+  self.update_options(options)
+  options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
+  options.view_as(SetupOptions).requirements_file = os.path.join(
+  source_dir, dependency.REQUIREMENTS_FILE)
+  self.create_temp_file(
+  os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
+  self.assertEqual(
+  sorted([dependency.REQUIREMENTS_FILE,
+  'abc.txt', 'def.txt']),
+  sorted(dependency.stage_job_resources(
+  options,
+  populate_requirements_cache=self.populate_requirements_cache)))
+  self.assertTrue(
+  os.path.isfile(
+  os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
+  self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
+  self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
+finally:
+  shutil.rmtree(staging_dir)
+  shutil.rmtree(requirements_cache_dir)
+  shutil.rmtree(source_dir)
 
   def test_requirements_file_not_present(self):
 staging_dir = tempfile.mkdtemp()



[GitHub] beam pull request #1811: Cleanup tests in pipeline_test.

2017-01-25 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/beam/pull/1811


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    13   14   15   16   17   18   19   >