Repository: beam
Updated Branches:
  refs/heads/master 99f93eb07 -> 4ccbdbc38


Rename NewDoFn to DoFn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1af093b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1af093b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1af093b1

Branch: refs/heads/master
Commit: 1af093b10dda8e4daf505257d60f46b4bc38e7eb
Parents: 99f93eb
Author: Sourabh Bajaj <sourabhba...@google.com>
Authored: Thu Feb 2 21:29:47 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Feb 3 09:47:48 2017 -0800

----------------------------------------------------------------------
 .../examples/complete/top_wikipedia_sessions.py | 10 +++----
 .../examples/cookbook/datastore_wordcount.py    |  2 +-
 .../examples/cookbook/multiple_output_pardo.py  |  2 +-
 .../apache_beam/examples/snippets/snippets.py   | 10 +++----
 .../examples/snippets/snippets_test.py          | 14 ++++-----
 sdks/python/apache_beam/examples/wordcount.py   |  2 +-
 .../apache_beam/examples/wordcount_debugging.py |  2 +-
 .../apache_beam/io/datastore/v1/datastoreio.py  |  8 +++---
 sdks/python/apache_beam/io/iobase.py            |  6 ++--
 sdks/python/apache_beam/pipeline_test.py        | 24 ++++++++--------
 sdks/python/apache_beam/runners/common.py       | 30 ++++++++++----------
 .../consumer_tracking_pipeline_visitor_test.py  |  6 ++--
 .../runners/direct/helper_transforms.py         |  6 ++--
 .../runners/direct/transform_evaluator.py       | 18 ++++++------
 sdks/python/apache_beam/runners/runner_test.py  |  4 +--
 sdks/python/apache_beam/transforms/core.py      | 28 +++++++++---------
 .../apache_beam/transforms/display_test.py      |  8 +++---
 .../apache_beam/transforms/ptransform_test.py   | 18 ++++++------
 .../apache_beam/transforms/window_test.py       |  4 +--
 sdks/python/apache_beam/typehints/typecheck.py  | 22 +++++++-------
 .../typehints/typed_pipeline_test.py            |  4 +--
 21 files changed, 114 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py 
b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 43a4ee2..d7fbe30 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -59,7 +59,7 @@ THIRTY_DAYS_IN_SECONDS = 30 * 24 * ONE_HOUR_IN_SECONDS
 MAX_TIMESTAMP = 0x7fffffffffffffff
 
 
-class ExtractUserAndTimestampDoFn(beam.NewDoFn):
+class ExtractUserAndTimestampDoFn(beam.DoFn):
   """Extracts user and timestamp representing a Wikipedia edit."""
 
   def process(self, element):
@@ -103,17 +103,17 @@ class TopPerMonth(beam.PTransform):
             .without_defaults())
 
 
-class SessionsToStringsDoFn(beam.NewDoFn):
+class SessionsToStringsDoFn(beam.DoFn):
   """Adds the session information to be part of the key."""
 
-  def process(self, element, window=beam.NewDoFn.WindowParam):
+  def process(self, element, window=beam.DoFn.WindowParam):
     yield (element[0] + ' : ' + str(window), element[1])
 
 
-class FormatOutputDoFn(beam.NewDoFn):
+class FormatOutputDoFn(beam.DoFn):
   """Formats a string containing the user, count, and session."""
 
-  def process(self, element, window=beam.NewDoFn.WindowParam):
+  def process(self, element, window=beam.DoFn.WindowParam):
     for kv in element:
       session = kv[0]
       count = kv[1]

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py 
b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 42cd59b..282afbf 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -84,7 +84,7 @@ word_length_counter = Metrics.counter('main', 'word_lengths')
 word_counter = Metrics.counter('main', 'total_words')
 
 
-class WordExtractingDoFn(beam.NewDoFn):
+class WordExtractingDoFn(beam.DoFn):
   """Parse each line of input text into words."""
 
   def process(self, element):

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py 
b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 0cbbf9a..26e97c7 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -60,7 +60,7 @@ from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
 
-class SplitLinesToWordsFn(beam.NewDoFn):
+class SplitLinesToWordsFn(beam.DoFn):
   """A transform to split a line of text into individual words.
 
   This transform will have 3 outputs:

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 105e3fb..42d194e 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -305,7 +305,7 @@ def pipeline_logging(lines, output):
   # import Python logging module.
   import logging
 
-  class ExtractWordsFn(beam.NewDoFn):
+  class ExtractWordsFn(beam.DoFn):
 
     def process(self, element):
       words = re.findall(r'[A-Za-z\']+', element)
@@ -346,14 +346,14 @@ def pipeline_monitoring(renames):
                           help='output for the pipeline',
                           default='gs://my-bucket/output')
 
-  class ExtractWordsFn(beam.NewDoFn):
+  class ExtractWordsFn(beam.DoFn):
 
     def process(self, element):
       words = re.findall(r'[A-Za-z\']+', element)
       for word in words:
         yield word
 
-  class FormatCountsFn(beam.NewDoFn):
+  class FormatCountsFn(beam.DoFn):
 
     def process(self, element):
       word, count = element
@@ -490,7 +490,7 @@ def examples_wordcount_wordcount(renames):
   # [END examples_wordcount_wordcount_composite]
 
   # [START examples_wordcount_wordcount_dofn]
-  class FormatAsTextFn(beam.NewDoFn):
+  class FormatAsTextFn(beam.DoFn):
 
     def process(self, element):
       word, count = element
@@ -514,7 +514,7 @@ def examples_wordcount_debugging(renames):
   # [START example_wordcount_debugging_aggregators]
   import logging
 
-  class FilterTextFn(beam.NewDoFn):
+  class FilterTextFn(beam.DoFn):
     """A DoFn that filters for a specific key based on a regular expression."""
 
     def __init__(self, pattern):

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index dcc4add..a602b66 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -47,7 +47,7 @@ class ParDoTest(unittest.TestCase):
     words = ['aa', 'bbb', 'c']
 
     # [START model_pardo_pardo]
-    class ComputeWordLengthFn(beam.NewDoFn):
+    class ComputeWordLengthFn(beam.DoFn):
       def process(self, element):
         return [len(element)]
     # [END model_pardo_pardo]
@@ -62,7 +62,7 @@ class ParDoTest(unittest.TestCase):
     words = ['aa', 'bbb', 'c']
 
     # [START model_pardo_yield]
-    class ComputeWordLengthFn(beam.NewDoFn):
+    class ComputeWordLengthFn(beam.DoFn):
       def process(self, element):
         yield len(element)
     # [END model_pardo_yield]
@@ -150,7 +150,7 @@ class ParDoTest(unittest.TestCase):
     words = ['a', 'bb', 'ccc', 'dddd']
 
     # [START model_pardo_side_input_dofn]
-    class FilterUsingLength(beam.NewDoFn):
+    class FilterUsingLength(beam.DoFn):
       def process(self, element, lower_bound, upper_bound=float('inf')):
         if lower_bound <= len(element) <= upper_bound:
           yield element
@@ -161,7 +161,7 @@ class ParDoTest(unittest.TestCase):
 
   def test_pardo_with_side_outputs(self):
     # [START model_pardo_emitting_values_on_side_outputs]
-    class ProcessWords(beam.NewDoFn):
+    class ProcessWords(beam.DoFn):
 
       def process(self, element, cutoff_length, marker):
         if len(element) <= cutoff_length:
@@ -261,7 +261,7 @@ class TypeHintsTest(unittest.TestCase):
     with self.assertRaises(typehints.TypeCheckError):
       # [START type_hints_do_fn]
       @beam.typehints.with_input_types(int)
-      class FilterEvensDoFn(beam.NewDoFn):
+      class FilterEvensDoFn(beam.DoFn):
         def process(self, element):
           if element % 2 == 0:
             yield element
@@ -358,7 +358,7 @@ class SnippetsTest(unittest.TestCase):
     def __init__(self, file_to_read=None):
       self.file_to_read = file_to_read
 
-    class ReadDoFn(beam.NewDoFn):
+    class ReadDoFn(beam.DoFn):
 
       def __init__(self, file_to_read):
         self.file_to_read = file_to_read
@@ -387,7 +387,7 @@ class SnippetsTest(unittest.TestCase):
     def __init__(self, file_to_write=None, file_name_suffix=''):
       self.file_to_write = file_to_write
 
-    class WriteDoFn(beam.NewDoFn):
+    class WriteDoFn(beam.DoFn):
       def __init__(self, file_to_write):
         self.file_to_write = file_to_write
         self.file_obj = None

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py 
b/sdks/python/apache_beam/examples/wordcount.py
index aedf00c..4d482b0 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -31,7 +31,7 @@ from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
 
-class WordExtractingDoFn(beam.NewDoFn):
+class WordExtractingDoFn(beam.DoFn):
   """Parse each line of input text into words."""
 
   def __init__(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py 
b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 03d1773..3d9cda4 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -53,7 +53,7 @@ from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
 
-class FilterTextFn(beam.NewDoFn):
+class FilterTextFn(beam.DoFn):
   """A DoFn that filters for a specific key based on a regular expression."""
   def __init__(self, pattern):
     super(FilterTextFn, self).__init__()

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py 
b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
index 52c5000..597a8bb 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
@@ -25,7 +25,7 @@ from googledatastore import helper as datastore_helper
 from apache_beam.io.datastore.v1 import helper
 from apache_beam.io.datastore.v1 import query_splitter
 from apache_beam.transforms import Create
-from apache_beam.transforms import NewDoFn
+from apache_beam.transforms import DoFn
 from apache_beam.transforms import FlatMap
 from apache_beam.transforms import GroupByKey
 from apache_beam.transforms import Map
@@ -143,7 +143,7 @@ class ReadFromDatastore(PTransform):
 
     return disp_data
 
-  class SplitQueryFn(NewDoFn):
+  class SplitQueryFn(DoFn):
     """A `DoFn` that splits a given query into multiple sub-queries."""
     def __init__(self, project, query, namespace, num_splits):
       super(ReadFromDatastore.SplitQueryFn, self).__init__()
@@ -199,7 +199,7 @@ class ReadFromDatastore(PTransform):
 
       return disp_data
 
-  class ReadFn(NewDoFn):
+  class ReadFn(DoFn):
     """A DoFn that reads entities from Cloud Datastore, for a given query."""
     def __init__(self, project, namespace=None):
       super(ReadFromDatastore.ReadFn, self).__init__()
@@ -320,7 +320,7 @@ class _Mutate(PTransform):
     return {'project': self._project,
             'mutation_fn': self._mutation_fn.__class__.__name__}
 
-  class DatastoreWriteFn(NewDoFn):
+  class DatastoreWriteFn(DoFn):
     """A ``DoFn`` that write mutations to Datastore.
 
     Mutations are written in batches, where the maximum batch size is

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index e26d071..a41df2c 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -781,7 +781,7 @@ class WriteImpl(ptransform.PTransform):
         min_shards)
 
 
-class _WriteBundleDoFn(core.NewDoFn):
+class _WriteBundleDoFn(core.DoFn):
   """A DoFn for writing elements to an iobase.Writer.
   Opens a writer at the first element and closes the writer at finish_bundle().
   """
@@ -803,7 +803,7 @@ class _WriteBundleDoFn(core.NewDoFn):
       yield window.TimestampedValue(self.writer.close(), window.MAX_TIMESTAMP)
 
 
-class _WriteKeyedBundleDoFn(core.NewDoFn):
+class _WriteKeyedBundleDoFn(core.DoFn):
 
   def __init__(self, sink):
     self.sink = sink
@@ -833,7 +833,7 @@ def _finalize_write(_, sink, init_result, write_results, 
min_shards):
     return (window.TimestampedValue(v, window.MAX_TIMESTAMP) for v in outputs)
 
 
-class _RoundRobinKeyFn(core.NewDoFn):
+class _RoundRobinKeyFn(core.DoFn):
 
   def __init__(self, count):
     self.count = count

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 95b55b9..90b1a54 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -31,7 +31,7 @@ from apache_beam.transforms import CombineGlobally
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
 from apache_beam.transforms import Map
-from apache_beam.transforms import NewDoFn
+from apache_beam.transforms import DoFn
 from apache_beam.transforms import ParDo
 from apache_beam.transforms import PTransform
 from apache_beam.transforms import Read
@@ -262,10 +262,10 @@ class PipelineTest(unittest.TestCase):
     self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))
 
 
-class NewDoFnTest(unittest.TestCase):
+class DoFnTest(unittest.TestCase):
 
   def test_element(self):
-    class TestDoFn(NewDoFn):
+    class TestDoFn(DoFn):
       def process(self, element):
         yield element + 10
 
@@ -275,8 +275,8 @@ class NewDoFnTest(unittest.TestCase):
     pipeline.run()
 
   def test_context_param(self):
-    class TestDoFn(NewDoFn):
-      def process(self, element, context=NewDoFn.ContextParam):
+    class TestDoFn(DoFn):
+      def process(self, element, context=DoFn.ContextParam):
         yield context.element + 10
 
     pipeline = TestPipeline()
@@ -285,7 +285,7 @@ class NewDoFnTest(unittest.TestCase):
     pipeline.run()
 
   def test_side_input_no_tag(self):
-    class TestDoFn(NewDoFn):
+    class TestDoFn(DoFn):
       def process(self, element, prefix, suffix):
         return ['%s-%s-%s' % (prefix, element, suffix)]
 
@@ -300,8 +300,8 @@ class NewDoFnTest(unittest.TestCase):
     pipeline.run()
 
   def test_side_input_tagged(self):
-    class TestDoFn(NewDoFn):
-      def process(self, element, prefix, suffix=NewDoFn.SideInputParam):
+    class TestDoFn(DoFn):
+      def process(self, element, prefix, suffix=DoFn.SideInputParam):
         return ['%s-%s-%s' % (prefix, element, suffix)]
 
     pipeline = TestPipeline()
@@ -315,8 +315,8 @@ class NewDoFnTest(unittest.TestCase):
     pipeline.run()
 
   def test_window_param(self):
-    class TestDoFn(NewDoFn):
-      def process(self, element, window=NewDoFn.WindowParam):
+    class TestDoFn(DoFn):
+      def process(self, element, window=DoFn.WindowParam):
         yield (element, (float(window.start), float(window.end)))
 
     pipeline = TestPipeline()
@@ -330,8 +330,8 @@ class NewDoFnTest(unittest.TestCase):
     pipeline.run()
 
   def test_timestamp_param(self):
-    class TestDoFn(NewDoFn):
-      def process(self, element, timestamp=NewDoFn.TimestampParam):
+    class TestDoFn(DoFn):
+      def process(self, element, timestamp=DoFn.TimestampParam):
         yield timestamp
 
     pipeline = TestPipeline()

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 0089f34..9c942c0 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -116,7 +116,7 @@ class DoFnRunner(Receiver):
       self.context = context
 
     # TODO(Sourabhbajaj): Remove the usage of OldDoFn
-    if isinstance(fn, core.NewDoFn):
+    if isinstance(fn, core.DoFn):
 
       class ArgPlaceholder(object):
         def __init__(self, placeholder):
@@ -146,7 +146,7 @@ class DoFnRunner(Receiver):
       # TODO(Sourabhbajaj) Rename this variable once oldDoFn is deprecated
       self.has_windowed_side_inputs = (
           self.has_windowed_side_inputs or
-          core.NewDoFn.WindowParam in defaults)
+          core.DoFn.WindowParam in defaults)
 
       # Try to prepare all the arguments that can just be filled in
       # without any additional work. in the process function.
@@ -158,9 +158,9 @@ class DoFnRunner(Receiver):
             args, kwargs, [si[global_window] for si in side_inputs])
 
       # Create placeholder for element parameter
-      if core.NewDoFn.ElementParam not in defaults:
+      if core.DoFn.ElementParam not in defaults:
         args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args
-        final_args = [ArgPlaceholder(core.NewDoFn.ElementParam)] + \
+        final_args = [ArgPlaceholder(core.DoFn.ElementParam)] + \
                      self.args[:args_to_pick]
       else:
         args_to_pick = len(arguments) - len(defaults) - self_in_args
@@ -169,15 +169,15 @@ class DoFnRunner(Receiver):
       # Fill the OtherPlaceholders for context, window or timestamp
       args = iter(self.args[args_to_pick:])
       for a, d in zip(arguments[-len(defaults):], defaults):
-        if d == core.NewDoFn.ElementParam:
+        if d == core.DoFn.ElementParam:
           final_args.append(ArgPlaceholder(d))
-        elif d == core.NewDoFn.ContextParam:
+        elif d == core.DoFn.ContextParam:
           final_args.append(ArgPlaceholder(d))
-        elif d == core.NewDoFn.WindowParam:
+        elif d == core.DoFn.WindowParam:
           final_args.append(ArgPlaceholder(d))
-        elif d == core.NewDoFn.TimestampParam:
+        elif d == core.DoFn.TimestampParam:
           final_args.append(ArgPlaceholder(d))
-        elif d == core.NewDoFn.SideInputParam:
+        elif d == core.DoFn.SideInputParam:
           # If no more args are present then the value must be passed via kwarg
           try:
             final_args.append(args.next())
@@ -225,7 +225,7 @@ class DoFnRunner(Receiver):
         else:
           self.dofn_process = lambda context: fn.process(context, *args)
 
-        class CurriedFn(core.DoFn):
+        class CurriedFn(core.OldDoFn):
 
           start_bundle = staticmethod(fn.start_bundle)
           process = staticmethod(self.dofn_process)
@@ -252,13 +252,13 @@ class DoFnRunner(Receiver):
   def _new_dofn_window_process(self, element, args, kwargs, window):
     # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
     for i, p in self.placeholders:
-      if p == core.NewDoFn.ElementParam:
+      if p == core.DoFn.ElementParam:
         args[i] = element.value
-      elif p == core.NewDoFn.ContextParam:
+      elif p == core.DoFn.ContextParam:
         args[i] = self.context
-      elif p == core.NewDoFn.WindowParam:
+      elif p == core.DoFn.WindowParam:
         args[i] = window
-      elif p == core.NewDoFn.TimestampParam:
+      elif p == core.DoFn.TimestampParam:
         args[i] = element.timestamp
     if not kwargs:
       self._process_outputs(element, self.dofn_process(*args))
@@ -289,7 +289,7 @@ class DoFnRunner(Receiver):
       if self.is_new_dofn:
         _, _, _, defaults = self.dofn.get_function_arguments(method)
         defaults = defaults if defaults else []
-        args = [self.context if d == core.NewDoFn.ContextParam else d
+        args = [self.context if d == core.DoFn.ContextParam else d
                 for d in defaults]
         self._process_outputs(None, f(*args))
       else:

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 5783b19..3cc39ea 100644
--- 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -29,7 +29,7 @@ from apache_beam.runners.direct import DirectRunner
 from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import 
ConsumerTrackingPipelineVisitor
 from apache_beam.transforms import CoGroupByKey
 from apache_beam.transforms import Create
-from apache_beam.transforms import NewDoFn
+from apache_beam.transforms import DoFn
 from apache_beam.transforms import FlatMap
 from apache_beam.transforms import Flatten
 from apache_beam.transforms import ParDo
@@ -74,7 +74,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
 
   def test_side_inputs(self):
 
-    class SplitNumbersFn(NewDoFn):
+    class SplitNumbersFn(DoFn):
 
       def process(self, element):
         if element < 0:
@@ -82,7 +82,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
         else:
           yield element
 
-    class ProcessNumbersFn(NewDoFn):
+    class ProcessNumbersFn(DoFn):
 
       def process(self, element, negatives):
         yield element

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/runners/direct/helper_transforms.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py 
b/sdks/python/apache_beam/runners/direct/helper_transforms.py
index e484bb7..374cd4e 100644
--- a/sdks/python/apache_beam/runners/direct/helper_transforms.py
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -43,7 +43,7 @@ class LiftedCombinePerKey(beam.PTransform):
         | beam.ParDo(FinishCombine(self._combine_fn)))
 
 
-class PartialGroupByKeyCombiningValues(beam.NewDoFn):
+class PartialGroupByKeyCombiningValues(beam.DoFn):
   """Aggregates values into a per-key-window cache.
 
   As bundles are in-memory-sized, we don't bother flushing until the very end.
@@ -54,7 +54,7 @@ class PartialGroupByKeyCombiningValues(beam.NewDoFn):
   def start_bundle(self):
     self._cache = collections.defaultdict(self._combine_fn.create_accumulator)
 
-  def process(self, element, window=beam.NewDoFn.WindowParam):
+  def process(self, element, window=beam.DoFn.WindowParam):
     k, vi = element
     self._cache[k, window] = self._combine_fn.add_input(self._cache[k, window],
                                                         vi)
@@ -76,7 +76,7 @@ class PartialGroupByKeyCombiningValues(beam.NewDoFn):
     return hints
 
 
-class FinishCombine(beam.NewDoFn):
+class FinishCombine(beam.DoFn):
   """Merges partially combined results.
   """
   def __init__(self, combine_fn):

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 13c87c5..3053fd3 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -35,10 +35,10 @@ from apache_beam.transforms import sideinputs
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
-from apache_beam.typehints.typecheck import OutputCheckWrapperNewDoFn
+from apache_beam.typehints.typecheck import OutputCheckWrapperOldDoFn
 from apache_beam.typehints.typecheck import TypeCheckError
 from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
-from apache_beam.typehints.typecheck import TypeCheckWrapperNewDoFn
+from apache_beam.typehints.typecheck import TypeCheckWrapperOldDoFn
 from apache_beam.utils import counters
 from apache_beam.utils.pipeline_options import TypeOptions
 
@@ -351,17 +351,17 @@ class _ParDoEvaluator(_TransformEvaluator):
     if (pipeline_options is not None
         and pipeline_options.view_as(TypeOptions).runtime_type_check):
       # TODO(sourabhbajaj): Remove this if-else
-      if isinstance(dofn, core.NewDoFn):
-        dofn = TypeCheckWrapperNewDoFn(dofn, transform.get_type_hints())
-      else:
+      if isinstance(dofn, core.DoFn):
         dofn = TypeCheckWrapperDoFn(dofn, transform.get_type_hints())
+      else:
+        dofn = TypeCheckWrapperOldDoFn(dofn, transform.get_type_hints())
 
     # TODO(sourabhbajaj): Remove this if-else
-    if isinstance(dofn, core.NewDoFn):
-      dofn = OutputCheckWrapperNewDoFn(
-          dofn, self._applied_ptransform.full_label)
-    else:
+    if isinstance(dofn, core.DoFn):
       dofn = OutputCheckWrapperDoFn(dofn, self._applied_ptransform.full_label)
+    else:
+      dofn = OutputCheckWrapperOldDoFn(dofn,
+                                       self._applied_ptransform.full_label)
     self.runner = DoFnRunner(
         dofn, transform.args, transform.kwargs,
         self._side_inputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py 
b/sdks/python/apache_beam/runners/runner_test.py
index 9f96506..5c652a9 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -109,7 +109,7 @@ class RunnerTest(unittest.TestCase):
                 'a_class': SpecialParDo,
                 'a_time': self.now}
 
-    class SpecialDoFn(beam.NewDoFn):
+    class SpecialDoFn(beam.DoFn):
       def display_data(self):
         return {'dofn_value': 42}
 
@@ -146,7 +146,7 @@ class RunnerTest(unittest.TestCase):
   def test_direct_runner_metrics(self):
     from apache_beam.metrics.metric import Metrics
 
-    class MyDoFn(beam.NewDoFn):
+    class MyDoFn(beam.DoFn):
       def start_bundle(self):
         count = Metrics.counter(self.__class__, 'bundles')
         count.inc()

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index bba6bd9..91de7f6 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -112,7 +112,7 @@ class DoFnProcessContext(DoFnContext):
       self.windows = windowed_value.windows
 
 
-class NewDoFn(WithTypeHints, HasDisplayData):
+class DoFn(WithTypeHints, HasDisplayData):
   """A function object used by a transform with custom processing.
 
   The ParDo transform is such a transform. The ParDo.apply
@@ -216,7 +216,7 @@ class NewDoFn(WithTypeHints, HasDisplayData):
     return True
 
 
-# TODO(Sourabh): Remove after migration to NewDoFn
+# TODO(Sourabh): Remove after migration to DoFn
 class OldDoFn(WithTypeHints, HasDisplayData):
   """A function object used by a transform with custom processing.
 
@@ -308,7 +308,7 @@ def _fn_takes_side_inputs(fn):
   return len(argspec.args) > 1 + is_bound or argspec.varargs or 
argspec.keywords
 
 
-class CallableWrapperDoFn(NewDoFn):
+class CallableWrapperDoFn(DoFn):
   """A DoFn (function) object wrapping a callable object.
 
   The purpose of this class is to conveniently wrap simple functions and use
@@ -679,7 +679,7 @@ class ParDo(PTransformWithSideInputs):
   def __init__(self, fn_or_label, *args, **kwargs):
     super(ParDo, self).__init__(fn_or_label, *args, **kwargs)
 
-    if not isinstance(self.fn, (OldDoFn, NewDoFn)):
+    if not isinstance(self.fn, (OldDoFn, DoFn)):
       raise TypeError('ParDo must be called with a DoFn instance.')
 
   def default_type_hints(self):
@@ -690,7 +690,7 @@ class ParDo(PTransformWithSideInputs):
         self.fn.infer_output_type(input_type))
 
   def make_fn(self, fn):
-    if isinstance(fn, (OldDoFn, NewDoFn)):
+    if isinstance(fn, (OldDoFn, DoFn)):
       return fn
     return CallableWrapperDoFn(fn)
 
@@ -1072,7 +1072,7 @@ class CombineValues(PTransformWithSideInputs):
         *args, **kwargs)
 
 
-class CombineValuesDoFn(NewDoFn):
+class CombineValuesDoFn(DoFn):
   """DoFn for performing per-key Combine transforms."""
 
   def __init__(self, input_pcoll_type, combinefn, runtime_type_check):
@@ -1138,10 +1138,10 @@ class GroupByKey(PTransform):
   The implementation here is used only when run on the local direct runner.
   """
 
-  class ReifyWindows(NewDoFn):
+  class ReifyWindows(DoFn):
 
-    def process(self, element, window=NewDoFn.WindowParam,
-                timestamp=NewDoFn.TimestampParam):
+    def process(self, element, window=DoFn.WindowParam,
+                timestamp=DoFn.TimestampParam):
       try:
         k, v = element
       except TypeError:
@@ -1154,7 +1154,7 @@ class GroupByKey(PTransform):
       key_type, value_type = trivial_inference.key_value_types(input_type)
       return Iterable[KV[key_type, typehints.WindowedValue[value_type]]]
 
-  class GroupAlsoByWindow(NewDoFn):
+  class GroupAlsoByWindow(DoFn):
     # TODO(robertwb): Support combiner lifting.
 
     def __init__(self, windowing):
@@ -1259,10 +1259,10 @@ class Partition(PTransformWithSideInputs):
   representing each of n partitions, in order.
   """
 
-  class ApplyPartitionFnFn(NewDoFn):
+  class ApplyPartitionFnFn(DoFn):
     """A DoFn that applies a PartitionFn."""
 
-    def process(self, element, partitionfn, n, context=NewDoFn.ContextParam,
+    def process(self, element, partitionfn, n, context=DoFn.ContextParam,
                 *args, **kwargs):
       partition = partitionfn.partition_for(context, n, *args, **kwargs)
       if not 0 <= partition < n:
@@ -1329,13 +1329,13 @@ class WindowInto(ParDo):
   determined by the windowing function.
   """
 
-  class WindowIntoFn(NewDoFn):
+  class WindowIntoFn(DoFn):
     """A DoFn that applies a WindowInto operation."""
 
     def __init__(self, windowing):
       self.windowing = windowing
 
-    def process(self, element, context=NewDoFn.ContextParam):
+    def process(self, element, context=DoFn.ContextParam):
       context = WindowFn.AssignContext(context.timestamp,
                                        element=element,
                                        existing_windows=context.windows)

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/transforms/display_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display_test.py 
b/sdks/python/apache_beam/transforms/display_test.py
index 81dcdca..5a95c42 100644
--- a/sdks/python/apache_beam/transforms/display_test.py
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -99,7 +99,7 @@ class DisplayDataTest(unittest.TestCase):
     self.assertEqual(display_pt.display_data(), {})
 
   def test_inheritance_dofn(self):
-    class MyDoFn(beam.NewDoFn):
+    class MyDoFn(beam.DoFn):
       pass
 
     display_dofn = MyDoFn()
@@ -126,7 +126,7 @@ class DisplayDataTest(unittest.TestCase):
     """ Tests basic display data cases (key:value, key:dict)
     It does not test subcomponent inclusion
     """
-    class MyDoFn(beam.NewDoFn):
+    class MyDoFn(beam.DoFn):
       def __init__(self, my_display_data=None):
         self.my_display_data = my_display_data
 
@@ -168,7 +168,7 @@ class DisplayDataTest(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_drop_if_none(self):
-    class MyDoFn(beam.NewDoFn):
+    class MyDoFn(beam.DoFn):
       def display_data(self):
         return {'some_val': DisplayDataItem('something').drop_if_none(),
                 'non_val': DisplayDataItem(None).drop_if_none(),
@@ -183,7 +183,7 @@ class DisplayDataTest(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_subcomponent(self):
-    class SpecialDoFn(beam.NewDoFn):
+    class SpecialDoFn(beam.DoFn):
       def display_data(self):
         return {'dofn_value': 42}
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 5d09f88..0981db5 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -106,7 +106,7 @@ class PTransformTest(unittest.TestCase):
         'instead of args=(0,), kwargs={\'name\': \'value\'}')
 
   def test_do_with_do_fn(self):
-    class AddNDoFn(beam.NewDoFn):
+    class AddNDoFn(beam.DoFn):
 
       def process(self, element, addon):
         return [element + addon]
@@ -118,7 +118,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_do_with_unconstructed_do_fn(self):
-    class MyDoFn(beam.NewDoFn):
+    class MyDoFn(beam.DoFn):
 
       def process(self):
         pass
@@ -192,7 +192,7 @@ class PTransformTest(unittest.TestCase):
 
   @attr('ValidatesRunner')
   def test_par_do_with_multiple_outputs_and_using_yield(self):
-    class SomeDoFn(beam.NewDoFn):
+    class SomeDoFn(beam.DoFn):
       """A custom DoFn using yield."""
 
       def process(self, element):
@@ -273,7 +273,7 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_do_fn_with_start_finish(self):
-    class MyDoFn(beam.NewDoFn):
+    class MyDoFn(beam.DoFn):
       def start_bundle(self):
         yield 'start'
 
@@ -721,7 +721,7 @@ class PTransformLabelsTest(unittest.TestCase):
     self.check_label(beam.CombineGlobally(sum), r'CombineGlobally(sum)')
     self.check_label(beam.CombinePerKey(sum), r'CombinePerKey(sum)')
 
-    class MyDoFn(beam.NewDoFn):
+    class MyDoFn(beam.DoFn):
       def process(self):
         pass
 
@@ -796,7 +796,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_do_fn_pipeline_pipeline_type_check_satisfied(self):
     @with_input_types(int, int)
     @with_output_types(typehints.List[int])
-    class AddWithFive(beam.NewDoFn):
+    class AddWithFive(beam.DoFn):
       def process(self, element, five):
         return [element + five]
 
@@ -810,7 +810,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_do_fn_pipeline_pipeline_type_check_violated(self):
     @with_input_types(str, str)
     @with_output_types(typehints.List[str])
-    class ToUpperCaseWithPrefix(beam.NewDoFn):
+    class ToUpperCaseWithPrefix(beam.DoFn):
       def process(self, element, prefix):
         return [prefix + element.upper()]
 
@@ -828,7 +828,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     @with_input_types(int, int)
     @with_output_types(int)
-    class AddWithNum(beam.NewDoFn):
+    class AddWithNum(beam.DoFn):
       def process(self, element, num):
         return [element + num]
 
@@ -844,7 +844,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     @with_input_types(int, int)
     @with_output_types(typehints.List[int])
-    class AddWithNum(beam.NewDoFn):
+    class AddWithNum(beam.DoFn):
       def process(self, element, num):
         return [element + num]
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index 24029ef..9375d25 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -47,8 +47,8 @@ def context(element, timestamp, windows):
 sort_values = Map(lambda (k, vs): (k, sorted(vs)))
 
 
-class ReifyWindowsFn(core.NewDoFn):
-  def process(self, element, window=core.NewDoFn.WindowParam):
+class ReifyWindowsFn(core.DoFn):
+  def process(self, element, window=core.DoFn.WindowParam):
     key, values = element
     yield "%s @ %s" % (key, window), values
 reify_windows = core.ParDo(ReifyWindowsFn())

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py 
b/sdks/python/apache_beam/typehints/typecheck.py
index bc5583f..bab5bb0 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -23,8 +23,8 @@ import sys
 import types
 
 from apache_beam.pvalue import SideOutputValue
+from apache_beam.transforms.core import DoFn
 from apache_beam.transforms.core import OldDoFn
-from apache_beam.transforms.core import NewDoFn
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.typehints import check_constraint
 from apache_beam.typehints import CompositeTypeHintError
@@ -35,8 +35,8 @@ from apache_beam.typehints.decorators import 
_check_instance_type
 from apache_beam.typehints.decorators import getcallargs_forhints
 
 
-# TODO(Sourabh): Remove after migration to NewDoFn
-class TypeCheckWrapperDoFn(OldDoFn):
+# TODO(Sourabh): Remove after migration to DoFn
+class TypeCheckWrapperOldDoFn(OldDoFn):
   """A wrapper around a DoFn which performs type-checking of input and output.
   """
 
@@ -124,8 +124,8 @@ class TypeCheckWrapperDoFn(OldDoFn):
       raise TypeCheckError, error_msg, sys.exc_info()[2]
 
 
-# TODO(Sourabh): Remove after migration to NewDoFn
-class OutputCheckWrapperDoFn(OldDoFn):
+# TODO(Sourabh): Remove after migration to DoFn
+class OutputCheckWrapperOldDoFn(OldDoFn):
   """A DoFn that verifies against common errors in the output type."""
 
   def __init__(self, dofn, full_label):
@@ -167,8 +167,8 @@ class OutputCheckWrapperDoFn(OldDoFn):
     return output
 
 
-class AbstractDoFnWrapper(NewDoFn):
-  """An abstract class to create wrapper around NewDoFn"""
+class AbstractDoFnWrapper(DoFn):
+  """An abstract class to create wrapper around DoFn"""
 
   def __init__(self, dofn):
     super(AbstractDoFnWrapper, self).__init__()
@@ -199,11 +199,11 @@ class AbstractDoFnWrapper(NewDoFn):
     return self.dofn.is_process_bounded()
 
 
-class OutputCheckWrapperNewDoFn(AbstractDoFnWrapper):
+class OutputCheckWrapperDoFn(AbstractDoFnWrapper):
   """A DoFn that verifies against common errors in the output type."""
 
   def __init__(self, dofn, full_label):
-    super(OutputCheckWrapperNewDoFn, self).__init__(dofn)
+    super(OutputCheckWrapperDoFn, self).__init__(dofn)
     self.full_label = full_label
 
   def wrapper(self, method, args, kwargs):
@@ -232,12 +232,12 @@ class OutputCheckWrapperNewDoFn(AbstractDoFnWrapper):
     return output
 
 
-class TypeCheckWrapperNewDoFn(AbstractDoFnWrapper):
+class TypeCheckWrapperDoFn(AbstractDoFnWrapper):
   """A wrapper around a DoFn which performs type-checking of input and output.
   """
 
   def __init__(self, dofn, type_hints, label=None):
-    super(TypeCheckWrapperNewDoFn, self).__init__(dofn)
+    super(TypeCheckWrapperDoFn, self).__init__(dofn)
     self.dofn = dofn
     self._process_fn = self.dofn.process_argspec_fn()
     if type_hints.input_types:

http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py 
b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 64d37bd..32f6a71 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -69,7 +69,7 @@ class MainInputTest(unittest.TestCase):
   def test_typed_dofn_class(self):
     @typehints.with_input_types(int)
     @typehints.with_output_types(str)
-    class MyDoFn(beam.NewDoFn):
+    class MyDoFn(beam.DoFn):
       def process(self, element):
         return [str(element)]
 
@@ -83,7 +83,7 @@ class MainInputTest(unittest.TestCase):
       [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
 
   def test_typed_dofn_instance(self):
-    class MyDoFn(beam.NewDoFn):
+    class MyDoFn(beam.DoFn):
       def process(self, element):
         return [str(element)]
     my_do_fn = MyDoFn().with_input_types(int).with_output_types(str)

Reply via email to