Repository: beam Updated Branches: refs/heads/master 99f93eb07 -> 4ccbdbc38
Rename NewDoFn to DoFn Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1af093b1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1af093b1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1af093b1 Branch: refs/heads/master Commit: 1af093b10dda8e4daf505257d60f46b4bc38e7eb Parents: 99f93eb Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Thu Feb 2 21:29:47 2017 -0800 Committer: Ahmet Altay <al...@google.com> Committed: Fri Feb 3 09:47:48 2017 -0800 ---------------------------------------------------------------------- .../examples/complete/top_wikipedia_sessions.py | 10 +++---- .../examples/cookbook/datastore_wordcount.py | 2 +- .../examples/cookbook/multiple_output_pardo.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 10 +++---- .../examples/snippets/snippets_test.py | 14 ++++----- sdks/python/apache_beam/examples/wordcount.py | 2 +- .../apache_beam/examples/wordcount_debugging.py | 2 +- .../apache_beam/io/datastore/v1/datastoreio.py | 8 +++--- sdks/python/apache_beam/io/iobase.py | 6 ++-- sdks/python/apache_beam/pipeline_test.py | 24 ++++++++-------- sdks/python/apache_beam/runners/common.py | 30 ++++++++++---------- .../consumer_tracking_pipeline_visitor_test.py | 6 ++-- .../runners/direct/helper_transforms.py | 6 ++-- .../runners/direct/transform_evaluator.py | 18 ++++++------ sdks/python/apache_beam/runners/runner_test.py | 4 +-- sdks/python/apache_beam/transforms/core.py | 28 +++++++++--------- .../apache_beam/transforms/display_test.py | 8 +++--- .../apache_beam/transforms/ptransform_test.py | 18 ++++++------ .../apache_beam/transforms/window_test.py | 4 +-- sdks/python/apache_beam/typehints/typecheck.py | 22 +++++++------- .../typehints/typed_pipeline_test.py | 4 +-- 21 files changed, 114 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index 43a4ee2..d7fbe30 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -59,7 +59,7 @@ THIRTY_DAYS_IN_SECONDS = 30 * 24 * ONE_HOUR_IN_SECONDS MAX_TIMESTAMP = 0x7fffffffffffffff -class ExtractUserAndTimestampDoFn(beam.NewDoFn): +class ExtractUserAndTimestampDoFn(beam.DoFn): """Extracts user and timestamp representing a Wikipedia edit.""" def process(self, element): @@ -103,17 +103,17 @@ class TopPerMonth(beam.PTransform): .without_defaults()) -class SessionsToStringsDoFn(beam.NewDoFn): +class SessionsToStringsDoFn(beam.DoFn): """Adds the session information to be part of the key.""" - def process(self, element, window=beam.NewDoFn.WindowParam): + def process(self, element, window=beam.DoFn.WindowParam): yield (element[0] + ' : ' + str(window), element[1]) -class FormatOutputDoFn(beam.NewDoFn): +class FormatOutputDoFn(beam.DoFn): """Formats a string containing the user, count, and session.""" - def process(self, element, window=beam.NewDoFn.WindowParam): + def process(self, element, window=beam.DoFn.WindowParam): for kv in element: session = kv[0] count = kv[1] http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 42cd59b..282afbf 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -84,7 +84,7 @@ word_length_counter = Metrics.counter('main', 'word_lengths') word_counter = Metrics.counter('main', 'total_words') -class WordExtractingDoFn(beam.NewDoFn): +class WordExtractingDoFn(beam.DoFn): """Parse each line of input text into words.""" def process(self, element): http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index 0cbbf9a..26e97c7 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -60,7 +60,7 @@ from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions -class SplitLinesToWordsFn(beam.NewDoFn): +class SplitLinesToWordsFn(beam.DoFn): """A transform to split a line of text into individual words. This transform will have 3 outputs: http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 105e3fb..42d194e 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -305,7 +305,7 @@ def pipeline_logging(lines, output): # import Python logging module. import logging - class ExtractWordsFn(beam.NewDoFn): + class ExtractWordsFn(beam.DoFn): def process(self, element): words = re.findall(r'[A-Za-z\']+', element) @@ -346,14 +346,14 @@ def pipeline_monitoring(renames): help='output for the pipeline', default='gs://my-bucket/output') - class ExtractWordsFn(beam.NewDoFn): + class ExtractWordsFn(beam.DoFn): def process(self, element): words = re.findall(r'[A-Za-z\']+', element) for word in words: yield word - class FormatCountsFn(beam.NewDoFn): + class FormatCountsFn(beam.DoFn): def process(self, element): word, count = element @@ -490,7 +490,7 @@ def examples_wordcount_wordcount(renames): # [END examples_wordcount_wordcount_composite] # [START examples_wordcount_wordcount_dofn] - class FormatAsTextFn(beam.NewDoFn): + class FormatAsTextFn(beam.DoFn): def process(self, element): word, count = element @@ -514,7 +514,7 @@ def examples_wordcount_debugging(renames): # [START example_wordcount_debugging_aggregators] import logging - class FilterTextFn(beam.NewDoFn): + class FilterTextFn(beam.DoFn): """A DoFn that filters for a specific key based on a regular expression.""" def __init__(self, pattern): http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index dcc4add..a602b66 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -47,7 +47,7 @@ class ParDoTest(unittest.TestCase): words = ['aa', 'bbb', 'c'] # [START model_pardo_pardo] - class ComputeWordLengthFn(beam.NewDoFn): + class ComputeWordLengthFn(beam.DoFn): def process(self, element): return [len(element)] # [END model_pardo_pardo] @@ -62,7 +62,7 @@ class ParDoTest(unittest.TestCase): words = ['aa', 'bbb', 'c'] # [START model_pardo_yield] - class ComputeWordLengthFn(beam.NewDoFn): + class ComputeWordLengthFn(beam.DoFn): def process(self, element): yield len(element) # [END model_pardo_yield] @@ -150,7 +150,7 @@ class ParDoTest(unittest.TestCase): words = ['a', 'bb', 'ccc', 'dddd'] # [START model_pardo_side_input_dofn] - class FilterUsingLength(beam.NewDoFn): + class FilterUsingLength(beam.DoFn): def process(self, element, lower_bound, upper_bound=float('inf')): if lower_bound <= len(element) <= upper_bound: yield element @@ -161,7 +161,7 @@ class ParDoTest(unittest.TestCase): def test_pardo_with_side_outputs(self): # [START model_pardo_emitting_values_on_side_outputs] - class ProcessWords(beam.NewDoFn): + class ProcessWords(beam.DoFn): def process(self, element, cutoff_length, marker): if len(element) <= cutoff_length: @@ -261,7 +261,7 @@ class TypeHintsTest(unittest.TestCase): with self.assertRaises(typehints.TypeCheckError): # [START type_hints_do_fn] @beam.typehints.with_input_types(int) - class FilterEvensDoFn(beam.NewDoFn): + class FilterEvensDoFn(beam.DoFn): def process(self, element): if element % 2 == 0: yield element @@ -358,7 +358,7 @@ class SnippetsTest(unittest.TestCase): def __init__(self, file_to_read=None): self.file_to_read = file_to_read - class ReadDoFn(beam.NewDoFn): + class ReadDoFn(beam.DoFn): def __init__(self, file_to_read): self.file_to_read = file_to_read @@ -387,7 +387,7 @@ class SnippetsTest(unittest.TestCase): def __init__(self, file_to_write=None, file_name_suffix=''): self.file_to_write = file_to_write - class WriteDoFn(beam.NewDoFn): + class WriteDoFn(beam.DoFn): def __init__(self, file_to_write): self.file_to_write = file_to_write self.file_obj = None http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index aedf00c..4d482b0 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -31,7 +31,7 @@ from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions -class WordExtractingDoFn(beam.NewDoFn): +class WordExtractingDoFn(beam.DoFn): """Parse each line of input text into words.""" def __init__(self): http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index 03d1773..3d9cda4 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -53,7 +53,7 @@ from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions -class FilterTextFn(beam.NewDoFn): +class FilterTextFn(beam.DoFn): """A DoFn that filters for a specific key based on a regular expression.""" def __init__(self, pattern): super(FilterTextFn, self).__init__() http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/io/datastore/v1/datastoreio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py index 52c5000..597a8bb 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py @@ -25,7 +25,7 @@ from googledatastore import helper as datastore_helper from apache_beam.io.datastore.v1 import helper from apache_beam.io.datastore.v1 import query_splitter from apache_beam.transforms import Create -from apache_beam.transforms import NewDoFn +from apache_beam.transforms import DoFn from apache_beam.transforms import FlatMap from apache_beam.transforms import GroupByKey from apache_beam.transforms import Map @@ -143,7 +143,7 @@ class ReadFromDatastore(PTransform): return disp_data - class SplitQueryFn(NewDoFn): + class SplitQueryFn(DoFn): """A `DoFn` that splits a given query into multiple sub-queries.""" def __init__(self, project, query, namespace, num_splits): super(ReadFromDatastore.SplitQueryFn, self).__init__() @@ -199,7 +199,7 @@ class ReadFromDatastore(PTransform): return disp_data - class ReadFn(NewDoFn): + class ReadFn(DoFn): """A DoFn that reads entities from Cloud Datastore, for a given query.""" def __init__(self, project, namespace=None): super(ReadFromDatastore.ReadFn, self).__init__() @@ -320,7 +320,7 @@ class _Mutate(PTransform): return {'project': self._project, 'mutation_fn': self._mutation_fn.__class__.__name__} - class DatastoreWriteFn(NewDoFn): + class DatastoreWriteFn(DoFn): """A ``DoFn`` that write mutations to Datastore. Mutations are written in batches, where the maximum batch size is http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index e26d071..a41df2c 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -781,7 +781,7 @@ class WriteImpl(ptransform.PTransform): min_shards) -class _WriteBundleDoFn(core.NewDoFn): +class _WriteBundleDoFn(core.DoFn): """A DoFn for writing elements to an iobase.Writer. Opens a writer at the first element and closes the writer at finish_bundle(). """ @@ -803,7 +803,7 @@ class _WriteBundleDoFn(core.NewDoFn): yield window.TimestampedValue(self.writer.close(), window.MAX_TIMESTAMP) -class _WriteKeyedBundleDoFn(core.NewDoFn): +class _WriteKeyedBundleDoFn(core.DoFn): def __init__(self, sink): self.sink = sink @@ -833,7 +833,7 @@ def _finalize_write(_, sink, init_result, write_results, min_shards): return (window.TimestampedValue(v, window.MAX_TIMESTAMP) for v in outputs) -class _RoundRobinKeyFn(core.NewDoFn): +class _RoundRobinKeyFn(core.DoFn): def __init__(self, count): self.count = count http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 95b55b9..90b1a54 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -31,7 +31,7 @@ from apache_beam.transforms import CombineGlobally from apache_beam.transforms import Create from apache_beam.transforms import FlatMap from apache_beam.transforms import Map -from apache_beam.transforms import NewDoFn +from apache_beam.transforms import DoFn from apache_beam.transforms import ParDo from apache_beam.transforms import PTransform from apache_beam.transforms import Read @@ -262,10 +262,10 @@ class PipelineTest(unittest.TestCase): self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x)) -class NewDoFnTest(unittest.TestCase): +class DoFnTest(unittest.TestCase): def test_element(self): - class TestDoFn(NewDoFn): + class TestDoFn(DoFn): def process(self, element): yield element + 10 @@ -275,8 +275,8 @@ class NewDoFnTest(unittest.TestCase): pipeline.run() def test_context_param(self): - class TestDoFn(NewDoFn): - def process(self, element, context=NewDoFn.ContextParam): + class TestDoFn(DoFn): + def process(self, element, context=DoFn.ContextParam): yield context.element + 10 pipeline = TestPipeline() @@ -285,7 +285,7 @@ class NewDoFnTest(unittest.TestCase): pipeline.run() def test_side_input_no_tag(self): - class TestDoFn(NewDoFn): + class TestDoFn(DoFn): def process(self, element, prefix, suffix): return ['%s-%s-%s' % (prefix, element, suffix)] @@ -300,8 +300,8 @@ class NewDoFnTest(unittest.TestCase): pipeline.run() def test_side_input_tagged(self): - class TestDoFn(NewDoFn): - def process(self, element, prefix, suffix=NewDoFn.SideInputParam): + class TestDoFn(DoFn): + def process(self, element, prefix, suffix=DoFn.SideInputParam): return ['%s-%s-%s' % (prefix, element, suffix)] pipeline = TestPipeline() @@ -315,8 +315,8 @@ class NewDoFnTest(unittest.TestCase): pipeline.run() def test_window_param(self): - class TestDoFn(NewDoFn): - def process(self, element, window=NewDoFn.WindowParam): + class TestDoFn(DoFn): + def process(self, element, window=DoFn.WindowParam): yield (element, (float(window.start), float(window.end))) pipeline = TestPipeline() @@ -330,8 +330,8 @@ class NewDoFnTest(unittest.TestCase): pipeline.run() def test_timestamp_param(self): - class TestDoFn(NewDoFn): - def process(self, element, timestamp=NewDoFn.TimestampParam): + class TestDoFn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): yield timestamp pipeline = TestPipeline() http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 0089f34..9c942c0 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -116,7 +116,7 @@ class DoFnRunner(Receiver): self.context = context # TODO(Sourabhbajaj): Remove the usage of OldDoFn - if isinstance(fn, core.NewDoFn): + if isinstance(fn, core.DoFn): class ArgPlaceholder(object): def __init__(self, placeholder): @@ -146,7 +146,7 @@ class DoFnRunner(Receiver): # TODO(Sourabhbajaj) Rename this variable once oldDoFn is deprecated self.has_windowed_side_inputs = ( self.has_windowed_side_inputs or - core.NewDoFn.WindowParam in defaults) + core.DoFn.WindowParam in defaults) # Try to prepare all the arguments that can just be filled in # without any additional work. in the process function. @@ -158,9 +158,9 @@ class DoFnRunner(Receiver): args, kwargs, [si[global_window] for si in side_inputs]) # Create placeholder for element parameter - if core.NewDoFn.ElementParam not in defaults: + if core.DoFn.ElementParam not in defaults: args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args - final_args = [ArgPlaceholder(core.NewDoFn.ElementParam)] + \ + final_args = [ArgPlaceholder(core.DoFn.ElementParam)] + \ self.args[:args_to_pick] else: args_to_pick = len(arguments) - len(defaults) - self_in_args @@ -169,15 +169,15 @@ class DoFnRunner(Receiver): # Fill the OtherPlaceholders for context, window or timestamp args = iter(self.args[args_to_pick:]) for a, d in zip(arguments[-len(defaults):], defaults): - if d == core.NewDoFn.ElementParam: + if d == core.DoFn.ElementParam: final_args.append(ArgPlaceholder(d)) - elif d == core.NewDoFn.ContextParam: + elif d == core.DoFn.ContextParam: final_args.append(ArgPlaceholder(d)) - elif d == core.NewDoFn.WindowParam: + elif d == core.DoFn.WindowParam: final_args.append(ArgPlaceholder(d)) - elif d == core.NewDoFn.TimestampParam: + elif d == core.DoFn.TimestampParam: final_args.append(ArgPlaceholder(d)) - elif d == core.NewDoFn.SideInputParam: + elif d == core.DoFn.SideInputParam: # If no more args are present then the value must be passed via kwarg try: final_args.append(args.next()) @@ -225,7 +225,7 @@ class DoFnRunner(Receiver): else: self.dofn_process = lambda context: fn.process(context, *args) - class CurriedFn(core.DoFn): + class CurriedFn(core.OldDoFn): start_bundle = staticmethod(fn.start_bundle) process = staticmethod(self.dofn_process) @@ -252,13 +252,13 @@ class DoFnRunner(Receiver): def _new_dofn_window_process(self, element, args, kwargs, window): # TODO(sourabhbajaj): Investigate why we can't use `is` instead of == for i, p in self.placeholders: - if p == core.NewDoFn.ElementParam: + if p == core.DoFn.ElementParam: args[i] = element.value - elif p == core.NewDoFn.ContextParam: + elif p == core.DoFn.ContextParam: args[i] = self.context - elif p == core.NewDoFn.WindowParam: + elif p == core.DoFn.WindowParam: args[i] = window - elif p == core.NewDoFn.TimestampParam: + elif p == core.DoFn.TimestampParam: args[i] = element.timestamp if not kwargs: self._process_outputs(element, self.dofn_process(*args)) @@ -289,7 +289,7 @@ class DoFnRunner(Receiver): if self.is_new_dofn: _, _, _, defaults = self.dofn.get_function_arguments(method) defaults = defaults if defaults else [] - args = [self.context if d == core.NewDoFn.ContextParam else d + args = [self.context if d == core.DoFn.ContextParam else d for d in defaults] self._process_outputs(None, f(*args)) else: http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py index 5783b19..3cc39ea 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py @@ -29,7 +29,7 @@ from apache_beam.runners.direct import DirectRunner from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor from apache_beam.transforms import CoGroupByKey from apache_beam.transforms import Create -from apache_beam.transforms import NewDoFn +from apache_beam.transforms import DoFn from apache_beam.transforms import FlatMap from apache_beam.transforms import Flatten from apache_beam.transforms import ParDo @@ -74,7 +74,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): def test_side_inputs(self): - class SplitNumbersFn(NewDoFn): + class SplitNumbersFn(DoFn): def process(self, element): if element < 0: @@ -82,7 +82,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): else: yield element - class ProcessNumbersFn(NewDoFn): + class ProcessNumbersFn(DoFn): def process(self, element, negatives): yield element http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/runners/direct/helper_transforms.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py index e484bb7..374cd4e 100644 --- a/sdks/python/apache_beam/runners/direct/helper_transforms.py +++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py @@ -43,7 +43,7 @@ class LiftedCombinePerKey(beam.PTransform): | beam.ParDo(FinishCombine(self._combine_fn))) -class PartialGroupByKeyCombiningValues(beam.NewDoFn): +class PartialGroupByKeyCombiningValues(beam.DoFn): """Aggregates values into a per-key-window cache. As bundles are in-memory-sized, we don't bother flushing until the very end. @@ -54,7 +54,7 @@ class PartialGroupByKeyCombiningValues(beam.NewDoFn): def start_bundle(self): self._cache = collections.defaultdict(self._combine_fn.create_accumulator) - def process(self, element, window=beam.NewDoFn.WindowParam): + def process(self, element, window=beam.DoFn.WindowParam): k, vi = element self._cache[k, window] = self._combine_fn.add_input(self._cache[k, window], vi) @@ -76,7 +76,7 @@ class PartialGroupByKeyCombiningValues(beam.NewDoFn): return hints -class FinishCombine(beam.NewDoFn): +class FinishCombine(beam.DoFn): """Merges partially combined results. """ def __init__(self, combine_fn): http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 13c87c5..3053fd3 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -35,10 +35,10 @@ from apache_beam.transforms import sideinputs from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn -from apache_beam.typehints.typecheck import OutputCheckWrapperNewDoFn +from apache_beam.typehints.typecheck import OutputCheckWrapperOldDoFn from apache_beam.typehints.typecheck import TypeCheckError from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn -from apache_beam.typehints.typecheck import TypeCheckWrapperNewDoFn +from apache_beam.typehints.typecheck import TypeCheckWrapperOldDoFn from apache_beam.utils import counters from apache_beam.utils.pipeline_options import TypeOptions @@ -351,17 +351,17 @@ class _ParDoEvaluator(_TransformEvaluator): if (pipeline_options is not None and pipeline_options.view_as(TypeOptions).runtime_type_check): # TODO(sourabhbajaj): Remove this if-else - if isinstance(dofn, core.NewDoFn): - dofn = TypeCheckWrapperNewDoFn(dofn, transform.get_type_hints()) - else: + if isinstance(dofn, core.DoFn): dofn = TypeCheckWrapperDoFn(dofn, transform.get_type_hints()) + else: + dofn = TypeCheckWrapperOldDoFn(dofn, transform.get_type_hints()) # TODO(sourabhbajaj): Remove this if-else - if isinstance(dofn, core.NewDoFn): - dofn = OutputCheckWrapperNewDoFn( - dofn, self._applied_ptransform.full_label) - else: + if isinstance(dofn, core.DoFn): dofn = OutputCheckWrapperDoFn(dofn, self._applied_ptransform.full_label) + else: + dofn = OutputCheckWrapperOldDoFn(dofn, + self._applied_ptransform.full_label) self.runner = DoFnRunner( dofn, transform.args, transform.kwargs, self._side_inputs, http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 9f96506..5c652a9 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -109,7 +109,7 @@ class RunnerTest(unittest.TestCase): 'a_class': SpecialParDo, 'a_time': self.now} - class SpecialDoFn(beam.NewDoFn): + class SpecialDoFn(beam.DoFn): def display_data(self): return {'dofn_value': 42} @@ -146,7 +146,7 @@ class RunnerTest(unittest.TestCase): def test_direct_runner_metrics(self): from apache_beam.metrics.metric import Metrics - class MyDoFn(beam.NewDoFn): + class MyDoFn(beam.DoFn): def start_bundle(self): count = Metrics.counter(self.__class__, 'bundles') count.inc() http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index bba6bd9..91de7f6 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -112,7 +112,7 @@ class DoFnProcessContext(DoFnContext): self.windows = windowed_value.windows -class NewDoFn(WithTypeHints, HasDisplayData): +class DoFn(WithTypeHints, HasDisplayData): """A function object used by a transform with custom processing. The ParDo transform is such a transform. The ParDo.apply @@ -216,7 +216,7 @@ class NewDoFn(WithTypeHints, HasDisplayData): return True -# TODO(Sourabh): Remove after migration to NewDoFn +# TODO(Sourabh): Remove after migration to DoFn class OldDoFn(WithTypeHints, HasDisplayData): """A function object used by a transform with custom processing. @@ -308,7 +308,7 @@ def _fn_takes_side_inputs(fn): return len(argspec.args) > 1 + is_bound or argspec.varargs or argspec.keywords -class CallableWrapperDoFn(NewDoFn): +class CallableWrapperDoFn(DoFn): """A DoFn (function) object wrapping a callable object. The purpose of this class is to conveniently wrap simple functions and use @@ -679,7 +679,7 @@ class ParDo(PTransformWithSideInputs): def __init__(self, fn_or_label, *args, **kwargs): super(ParDo, self).__init__(fn_or_label, *args, **kwargs) - if not isinstance(self.fn, (OldDoFn, NewDoFn)): + if not isinstance(self.fn, (OldDoFn, DoFn)): raise TypeError('ParDo must be called with a DoFn instance.') def default_type_hints(self): @@ -690,7 +690,7 @@ class ParDo(PTransformWithSideInputs): self.fn.infer_output_type(input_type)) def make_fn(self, fn): - if isinstance(fn, (OldDoFn, NewDoFn)): + if isinstance(fn, (OldDoFn, DoFn)): return fn return CallableWrapperDoFn(fn) @@ -1072,7 +1072,7 @@ class CombineValues(PTransformWithSideInputs): *args, **kwargs) -class CombineValuesDoFn(NewDoFn): +class CombineValuesDoFn(DoFn): """DoFn for performing per-key Combine transforms.""" def __init__(self, input_pcoll_type, combinefn, runtime_type_check): @@ -1138,10 +1138,10 @@ class GroupByKey(PTransform): The implementation here is used only when run on the local direct runner. """ - class ReifyWindows(NewDoFn): + class ReifyWindows(DoFn): - def process(self, element, window=NewDoFn.WindowParam, - timestamp=NewDoFn.TimestampParam): + def process(self, element, window=DoFn.WindowParam, + timestamp=DoFn.TimestampParam): try: k, v = element except TypeError: @@ -1154,7 +1154,7 @@ class GroupByKey(PTransform): key_type, value_type = trivial_inference.key_value_types(input_type) return Iterable[KV[key_type, typehints.WindowedValue[value_type]]] - class GroupAlsoByWindow(NewDoFn): + class GroupAlsoByWindow(DoFn): # TODO(robertwb): Support combiner lifting. def __init__(self, windowing): @@ -1259,10 +1259,10 @@ class Partition(PTransformWithSideInputs): representing each of n partitions, in order. """ - class ApplyPartitionFnFn(NewDoFn): + class ApplyPartitionFnFn(DoFn): """A DoFn that applies a PartitionFn.""" - def process(self, element, partitionfn, n, context=NewDoFn.ContextParam, + def process(self, element, partitionfn, n, context=DoFn.ContextParam, *args, **kwargs): partition = partitionfn.partition_for(context, n, *args, **kwargs) if not 0 <= partition < n: @@ -1329,13 +1329,13 @@ class WindowInto(ParDo): determined by the windowing function. """ - class WindowIntoFn(NewDoFn): + class WindowIntoFn(DoFn): """A DoFn that applies a WindowInto operation.""" def __init__(self, windowing): self.windowing = windowing - def process(self, element, context=NewDoFn.ContextParam): + def process(self, element, context=DoFn.ContextParam): context = WindowFn.AssignContext(context.timestamp, element=element, existing_windows=context.windows) http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/transforms/display_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 81dcdca..5a95c42 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -99,7 +99,7 @@ class DisplayDataTest(unittest.TestCase): self.assertEqual(display_pt.display_data(), {}) def test_inheritance_dofn(self): - class MyDoFn(beam.NewDoFn): + class MyDoFn(beam.DoFn): pass display_dofn = MyDoFn() @@ -126,7 +126,7 @@ class DisplayDataTest(unittest.TestCase): """ Tests basic display data cases (key:value, key:dict) It does not test subcomponent inclusion """ - class MyDoFn(beam.NewDoFn): + class MyDoFn(beam.DoFn): def __init__(self, my_display_data=None): self.my_display_data = my_display_data @@ -168,7 +168,7 @@ class DisplayDataTest(unittest.TestCase): hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_drop_if_none(self): - class MyDoFn(beam.NewDoFn): + class MyDoFn(beam.DoFn): def display_data(self): return {'some_val': DisplayDataItem('something').drop_if_none(), 'non_val': DisplayDataItem(None).drop_if_none(), @@ -183,7 +183,7 @@ class DisplayDataTest(unittest.TestCase): hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_subcomponent(self): - class SpecialDoFn(beam.NewDoFn): + class SpecialDoFn(beam.DoFn): def display_data(self): return {'dofn_value': 42} http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 5d09f88..0981db5 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -106,7 +106,7 @@ class PTransformTest(unittest.TestCase): 'instead of args=(0,), kwargs={\'name\': \'value\'}') def test_do_with_do_fn(self): - class AddNDoFn(beam.NewDoFn): + class AddNDoFn(beam.DoFn): def process(self, element, addon): return [element + addon] @@ -118,7 +118,7 @@ class PTransformTest(unittest.TestCase): pipeline.run() def test_do_with_unconstructed_do_fn(self): - class MyDoFn(beam.NewDoFn): + class MyDoFn(beam.DoFn): def process(self): pass @@ -192,7 +192,7 @@ class PTransformTest(unittest.TestCase): @attr('ValidatesRunner') def test_par_do_with_multiple_outputs_and_using_yield(self): - class SomeDoFn(beam.NewDoFn): + class SomeDoFn(beam.DoFn): """A custom DoFn using yield.""" def process(self, element): @@ -273,7 +273,7 @@ class PTransformTest(unittest.TestCase): self.assertStartswith(cm.exception.message, expected_error_prefix) def test_do_fn_with_start_finish(self): - class MyDoFn(beam.NewDoFn): + class MyDoFn(beam.DoFn): def start_bundle(self): yield 'start' @@ -721,7 +721,7 @@ class PTransformLabelsTest(unittest.TestCase): self.check_label(beam.CombineGlobally(sum), r'CombineGlobally(sum)') self.check_label(beam.CombinePerKey(sum), r'CombinePerKey(sum)') - class MyDoFn(beam.NewDoFn): + class MyDoFn(beam.DoFn): def process(self): pass @@ -796,7 +796,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_do_fn_pipeline_pipeline_type_check_satisfied(self): @with_input_types(int, int) @with_output_types(typehints.List[int]) - class AddWithFive(beam.NewDoFn): + class AddWithFive(beam.DoFn): def process(self, element, five): return [element + five] @@ -810,7 +810,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_do_fn_pipeline_pipeline_type_check_violated(self): @with_input_types(str, str) @with_output_types(typehints.List[str]) - class ToUpperCaseWithPrefix(beam.NewDoFn): + class ToUpperCaseWithPrefix(beam.DoFn): def process(self, element, prefix): return [prefix + element.upper()] @@ -828,7 +828,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): @with_input_types(int, int) @with_output_types(int) - class AddWithNum(beam.NewDoFn): + class AddWithNum(beam.DoFn): def process(self, element, num): return [element + num] @@ -844,7 +844,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): @with_input_types(int, int) @with_output_types(typehints.List[int]) - class AddWithNum(beam.NewDoFn): + class AddWithNum(beam.DoFn): def process(self, element, num): return [element + num] http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 24029ef..9375d25 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -47,8 +47,8 @@ def context(element, timestamp, windows): sort_values = Map(lambda (k, vs): (k, sorted(vs))) -class ReifyWindowsFn(core.NewDoFn): - def process(self, element, window=core.NewDoFn.WindowParam): +class ReifyWindowsFn(core.DoFn): + def process(self, element, window=core.DoFn.WindowParam): key, values = element yield "%s @ %s" % (key, window), values reify_windows = core.ParDo(ReifyWindowsFn()) http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/typehints/typecheck.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index bc5583f..bab5bb0 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -23,8 +23,8 @@ import sys import types from apache_beam.pvalue import SideOutputValue +from apache_beam.transforms.core import DoFn from apache_beam.transforms.core import OldDoFn -from apache_beam.transforms.core import NewDoFn from apache_beam.transforms.window import WindowedValue from apache_beam.typehints import check_constraint from apache_beam.typehints import CompositeTypeHintError @@ -35,8 +35,8 @@ from apache_beam.typehints.decorators import _check_instance_type from apache_beam.typehints.decorators import getcallargs_forhints -# TODO(Sourabh): Remove after migration to NewDoFn -class TypeCheckWrapperDoFn(OldDoFn): +# TODO(Sourabh): Remove after migration to DoFn +class TypeCheckWrapperOldDoFn(OldDoFn): """A wrapper around a DoFn which performs type-checking of input and output. """ @@ -124,8 +124,8 @@ class TypeCheckWrapperDoFn(OldDoFn): raise TypeCheckError, error_msg, sys.exc_info()[2] -# TODO(Sourabh): Remove after migration to NewDoFn -class OutputCheckWrapperDoFn(OldDoFn): +# TODO(Sourabh): Remove after migration to DoFn +class OutputCheckWrapperOldDoFn(OldDoFn): """A DoFn that verifies against common errors in the output type.""" def __init__(self, dofn, full_label): @@ -167,8 +167,8 @@ class OutputCheckWrapperDoFn(OldDoFn): return output -class AbstractDoFnWrapper(NewDoFn): - """An abstract class to create wrapper around NewDoFn""" +class AbstractDoFnWrapper(DoFn): + """An abstract class to create wrapper around DoFn""" def __init__(self, dofn): super(AbstractDoFnWrapper, self).__init__() @@ -199,11 +199,11 @@ class AbstractDoFnWrapper(NewDoFn): return self.dofn.is_process_bounded() -class OutputCheckWrapperNewDoFn(AbstractDoFnWrapper): +class OutputCheckWrapperDoFn(AbstractDoFnWrapper): """A DoFn that verifies against common errors in the output type.""" def __init__(self, dofn, full_label): - super(OutputCheckWrapperNewDoFn, self).__init__(dofn) + super(OutputCheckWrapperDoFn, self).__init__(dofn) self.full_label = full_label def wrapper(self, method, args, kwargs): @@ -232,12 +232,12 @@ class OutputCheckWrapperNewDoFn(AbstractDoFnWrapper): return output -class TypeCheckWrapperNewDoFn(AbstractDoFnWrapper): +class TypeCheckWrapperDoFn(AbstractDoFnWrapper): """A wrapper around a DoFn which performs type-checking of input and output. """ def __init__(self, dofn, type_hints, label=None): - super(TypeCheckWrapperNewDoFn, self).__init__(dofn) + super(TypeCheckWrapperDoFn, self).__init__(dofn) self.dofn = dofn self._process_fn = self.dofn.process_argspec_fn() if type_hints.input_types: http://git-wip-us.apache.org/repos/asf/beam/blob/1af093b1/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 64d37bd..32f6a71 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -69,7 +69,7 @@ class MainInputTest(unittest.TestCase): def test_typed_dofn_class(self): @typehints.with_input_types(int) @typehints.with_output_types(str) - class MyDoFn(beam.NewDoFn): + class MyDoFn(beam.DoFn): def process(self, element): return [str(element)] @@ -83,7 +83,7 @@ class MainInputTest(unittest.TestCase): [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_dofn_instance(self): - class MyDoFn(beam.NewDoFn): + class MyDoFn(beam.DoFn): def process(self, element): return [str(element)] my_do_fn = MyDoFn().with_input_types(int).with_output_types(str)