Repository: beam Updated Branches: refs/heads/master 836e8e4aa -> fc1006500
Create as custom source Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33d4a02b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33d4a02b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33d4a02b Branch: refs/heads/master Commit: 33d4a02bb28e5a1c09513dc3e7701b30df148943 Parents: 836e8e4 Author: Vikas Kedigehalli <vika...@google.com> Authored: Mon Apr 3 10:01:45 2017 -0700 Committer: Chamikara Jayalath <chamik...@google.com> Committed: Mon Apr 10 14:17:15 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/internal/pickler.py | 22 ++-- sdks/python/apache_beam/pipeline.py | 4 +- sdks/python/apache_beam/pipeline_test.py | 13 +- .../runners/dataflow/dataflow_runner.py | 24 ---- .../runners/dataflow/dataflow_runner_test.py | 2 +- .../consumer_tracking_pipeline_visitor_test.py | 22 ++-- .../runners/direct/transform_evaluator.py | 31 ----- sdks/python/apache_beam/transforms/core.py | 92 +++++++++++++- .../apache_beam/transforms/create_test.py | 121 +++++++++++++++++++ .../apache_beam/transforms/ptransform_test.py | 14 +-- 10 files changed, 254 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/internal/pickler.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 67f9fc3..a4ab7b9 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -181,12 +181,15 @@ logging.getLogger('dill').setLevel(logging.WARN) # TODO(ccy): Currently, there are still instances of pickler.dumps() and # pickler.loads() being used for data, which results in an unnecessary base64 # encoding. This should be cleaned up. -def dumps(o): +def dumps(o, enable_trace=True): try: s = dill.dumps(o) - except Exception: # pylint: disable=broad-except - dill.dill._trace(True) # pylint: disable=protected-access - s = dill.dumps(o) + except Exception as e: # pylint: disable=broad-except + if enable_trace: + dill.dill._trace(True) # pylint: disable=protected-access + s = dill.dumps(o) + else: + raise e finally: dill.dill._trace(False) # pylint: disable=protected-access @@ -199,7 +202,7 @@ def dumps(o): return base64.b64encode(c) -def loads(encoded): +def loads(encoded, enable_trace=True): c = base64.b64decode(encoded) s = zlib.decompress(c) @@ -207,9 +210,12 @@ def loads(encoded): try: return dill.loads(s) - except Exception: # pylint: disable=broad-except - dill.dill._trace(True) # pylint: disable=protected-access - return dill.loads(s) + except Exception as e: # pylint: disable=broad-except + if enable_trace: + dill.dill._trace(True) # pylint: disable=protected-access + return dill.loads(s) + else: + raise e finally: dill.dill._trace(False) # pylint: disable=protected-access http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index b93167d..2ff9eb3 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -315,7 +315,9 @@ class Pipeline(object): Visitor.ok = False try: # Transforms must be picklable. - pickler.loads(pickler.dumps(transform_node.transform)) + pickler.loads(pickler.dumps(transform_node.transform, + enable_trace=False), + enable_trace=False) except Exception: Visitor.ok = False http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index ba219bf..6314609 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -173,9 +173,9 @@ class PipelineTest(unittest.TestCase): set(visitor.visited)) self.assertEqual(set(visitor.enter_composite), set(visitor.leave_composite)) - self.assertEqual(2, len(visitor.enter_composite)) - self.assertEqual(visitor.enter_composite[1].transform, transform) - self.assertEqual(visitor.leave_composite[0].transform, transform) + self.assertEqual(3, len(visitor.enter_composite)) + self.assertEqual(visitor.enter_composite[2].transform, transform) + self.assertEqual(visitor.leave_composite[1].transform, transform) def test_apply_custom_transform(self): pipeline = TestPipeline() @@ -280,9 +280,10 @@ class PipelineTest(unittest.TestCase): # pylint: disable=expression-not-assigned p | Create([ValueError]) | Map(raise_exception) - def test_eager_pipeline(self): - p = Pipeline('EagerRunner') - self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x)) + # TODO(BEAM-1894). + # def test_eager_pipeline(self): + # p = Pipeline('EagerRunner') + # self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x)) class DoFnTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index f0600bc..1a935c1 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -21,7 +21,6 @@ The runner will create a JSON description of the job graph and then submit it to the Dataflow Service for remote execution by a worker. """ -import base64 import logging import threading import time @@ -261,29 +260,6 @@ class DataflowRunner(PipelineRunner): return step - def run_Create(self, transform_node): - transform = transform_node.transform - step = self._add_step(TransformNames.CREATE_PCOLLECTION, - transform_node.full_label, transform_node) - # TODO(silviuc): Eventually use a coder based on typecoders. - # Note that we base64-encode values here so that the service will accept - # the values. - element_coder = coders.PickleCoder() - step.add_property( - PropertyNames.ELEMENT, - [base64.b64encode(element_coder.encode(v)) - for v in transform.value]) - # The service expects a WindowedValueCoder here, so we wrap the actual - # encoding in a WindowedValueCoder. - step.encoding = self._get_cloud_encoding( - coders.WindowedValueCoder(element_coder)) - step.add_property( - PropertyNames.OUTPUT_INFO, - [{PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - def _add_singleton_step(self, label, full_label, tag, input_step): """Creates a CollectionToSingleton step used to handle ParDo side inputs.""" # Import here to avoid adding the dependency for local running scenarios. http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index cc5928a..b9ed84d 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -142,7 +142,7 @@ class DataflowRunnerTest(unittest.TestCase): steps = [step for step in job_dict['steps'] if len(step['properties'].get('display_data', [])) > 0] - step = steps[0] + step = steps[1] disp_data = step['properties']['display_data'] disp_data = sorted(disp_data, key=lambda x: x['namespace']+x['key']) nspace = SpecialParDo.__module__+ '.' http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py index eb8b14b..154284b 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py @@ -46,8 +46,6 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): self.visitor = ConsumerTrackingPipelineVisitor() def test_root_transforms(self): - root_create = Create([[1, 2, 3]]) - class DummySource(iobase.BoundedSource): pass @@ -55,9 +53,8 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): root_flatten = Flatten(pipeline=self.pipeline) pbegin = pvalue.PBegin(self.pipeline) - pcoll_create = pbegin | 'create' >> root_create - pbegin | 'read' >> root_read - pcoll_create | FlatMap(lambda x: x) + pcoll_read = pbegin | 'read' >> root_read + pcoll_read | FlatMap(lambda x: x) [] | 'flatten' >> root_flatten self.pipeline.visit(self.visitor) @@ -66,12 +63,12 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): [t.transform for t in self.visitor.root_transforms]) self.assertEqual(root_transforms, sorted( - [root_read, root_create, root_flatten])) + [root_read, root_flatten])) pbegin_consumers = sorted( [c.transform for c in self.visitor.value_to_consumers[pbegin]]) - self.assertEqual(pbegin_consumers, sorted([root_read, root_create])) - self.assertEqual(len(self.visitor.step_names), 4) + self.assertEqual(pbegin_consumers, sorted([root_read])) + self.assertEqual(len(self.visitor.step_names), 3) def test_side_inputs(self): @@ -88,10 +85,13 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): def process(self, element, negatives): yield element - root_create = Create([[-1, 2, 3]]) + class DummySource(iobase.BoundedSource): + pass + + root_read = Read(DummySource()) result = (self.pipeline - | 'create' >> root_create + | 'read' >> root_read | ParDo(SplitNumbersFn()).with_outputs('tag_negative', main='positive')) positive, negative = result @@ -101,7 +101,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): root_transforms = sorted( [t.transform for t in self.visitor.root_transforms]) - self.assertEqual(root_transforms, sorted([root_create])) + self.assertEqual(root_transforms, sorted([root_read])) self.assertEqual(len(self.visitor.step_names), 3) self.assertEqual(len(self.visitor.views), 1) self.assertTrue(isinstance(self.visitor.views[0], http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 0c35d99..662c61d 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -49,7 +49,6 @@ class TransformEvaluatorRegistry(object): self._evaluation_context = evaluation_context self._evaluators = { io.Read: _BoundedReadEvaluator, - core.Create: _CreateEvaluator, core.Flatten: _FlattenEvaluator, core.ParDo: _ParDoEvaluator, core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator, @@ -233,36 +232,6 @@ class _FlattenEvaluator(_TransformEvaluator): self._applied_ptransform, bundles, None, None, None, None) -class _CreateEvaluator(_TransformEvaluator): - """TransformEvaluator for Create transform.""" - - def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs, scoped_metrics_container): - assert not input_committed_bundle - assert not side_inputs - super(_CreateEvaluator, self).__init__( - evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs, scoped_metrics_container) - - def start_bundle(self): - assert len(self._outputs) == 1 - output_pcollection = list(self._outputs)[0] - self.bundle = self._evaluation_context.create_bundle(output_pcollection) - - def finish_bundle(self): - bundles = [] - transform = self._applied_ptransform.transform - - assert transform.value is not None - create_result = [GlobalWindows.windowed_value(v) for v in transform.value] - for result in create_result: - self.bundle.output(result) - bundles.append(self.bundle) - - return TransformResult( - self._applied_ptransform, bundles, None, None, None, None) - - class _TaggedReceivers(dict): """Received ParDo output and redirect to the associated output bundle.""" http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 88fdec8..cf313d1 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1088,7 +1088,6 @@ class GroupByKey(PTransform): # This code path is only used in the local direct runner. For Dataflow # runner execution, the GroupByKey transform is expanded on the service. input_type = pcoll.element_type - if input_type is not None: # Initialize type-hints used below to enforce type-checking and to pass # downstream to further PTransforms. @@ -1373,11 +1372,100 @@ class Create(PTransform): def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline - return pvalue.PCollection(self.pipeline) + ouput_type = (self.get_type_hints().simple_output_type(self.label) or + self.infer_output_type(None)) + coder = typecoders.registry.get_coder(ouput_type) + source = self._create_source_from_iterable(self.value, coder) + return pbegin.pipeline | Read(source).with_output_types(ouput_type) def get_windowing(self, unused_inputs): return Windowing(GlobalWindows()) + @staticmethod + def _create_source_from_iterable(values, coder): + return Create._create_source(map(coder.encode, values), coder) + + @staticmethod + def _create_source(serialized_values, coder): + from apache_beam.io import iobase + + class _CreateSource(iobase.BoundedSource): + def __init__(self, serialized_values, coder): + self._coder = coder + self._serialized_values = [] + self._total_size = 0 + self._serialized_values = serialized_values + self._total_size = sum(map(len, self._serialized_values)) + + def read(self, range_tracker): + start_position = range_tracker.start_position() + current_position = start_position + + def split_points_unclaimed(stop_position): + if current_position >= stop_position: + return 0 + else: + return stop_position - current_position - 1 + + range_tracker.set_split_points_unclaimed_callback( + split_points_unclaimed) + element_iter = iter(self._serialized_values[start_position:]) + for i in range(start_position, range_tracker.stop_position()): + if not range_tracker.try_claim(i): + return + current_position = i + yield self._coder.decode(next(element_iter)) + + def split(self, desired_bundle_size, start_position=None, + stop_position=None): + from apache_beam.io import iobase + + if len(self._serialized_values) < 2: + yield iobase.SourceBundle( + weight=0, source=self, start_position=0, + stop_position=len(self._serialized_values)) + else: + if start_position is None: + start_position = 0 + if stop_position is None: + stop_position = len(self._serialized_values) + + avg_size_per_value = self._total_size / len(self._serialized_values) + num_values_per_split = max( + int(desired_bundle_size / avg_size_per_value), 1) + + start = start_position + while start < stop_position: + end = min(start + num_values_per_split, stop_position) + remaining = stop_position - end + # Avoid having a too small bundle at the end. + if remaining < (num_values_per_split / 4): + end = stop_position + + sub_source = Create._create_source( + self._serialized_values[start:end], self._coder) + + yield iobase.SourceBundle(weight=(end - start), + source=sub_source, + start_position=0, + stop_position=(end - start)) + + start = end + + def get_range_tracker(self, start_position, stop_position): + if start_position is None: + start_position = 0 + if stop_position is None: + stop_position = len(self._serialized_values) + + from apache_beam import io + return io.OffsetRangeTracker(start_position, stop_position) + + def estimate_size(self): + return self._total_size + + return _CreateSource(serialized_values, coder) + def Read(*args, **kwargs): from apache_beam import io http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/transforms/create_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py new file mode 100644 index 0000000..f4b1f07 --- /dev/null +++ b/sdks/python/apache_beam/transforms/create_test.py @@ -0,0 +1,121 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the Create and _CreateSource classes.""" +import unittest + +from apache_beam.io import source_test_utils + +from apache_beam import Create, assert_that, equal_to +from apache_beam.coders import FastPrimitivesCoder +from apache_beam.test_pipeline import TestPipeline + + +class CreateTest(unittest.TestCase): + def setUp(self): + self.coder = FastPrimitivesCoder() + + def test_create_transform(self): + with TestPipeline() as p: + assert_that(p | Create(range(10)), equal_to(range(10))) + + def test_create_source_read(self): + self.check_read([], self.coder) + self.check_read([1], self.coder) + # multiple values. + self.check_read(range(10), self.coder) + + def check_read(self, values, coder): + source = Create._create_source_from_iterable(values, coder) + read_values = source_test_utils.readFromSource(source) + self.assertEqual(sorted(values), sorted(read_values)) + + def test_create_source_read_with_initial_splits(self): + self.check_read_with_initial_splits([], self.coder, num_splits=2) + self.check_read_with_initial_splits([1], self.coder, num_splits=2) + values = range(8) + # multiple values with a single split. + self.check_read_with_initial_splits(values, self.coder, num_splits=1) + # multiple values with a single split with a large desired bundle size + self.check_read_with_initial_splits(values, self.coder, num_splits=0.5) + # multiple values with many splits. + self.check_read_with_initial_splits(values, self.coder, num_splits=3) + # multiple values with uneven sized splits. + self.check_read_with_initial_splits(values, self.coder, num_splits=4) + # multiple values with num splits equal to num values. + self.check_read_with_initial_splits(values, self.coder, + num_splits=len(values)) + # multiple values with num splits greater than to num values. + self.check_read_with_initial_splits(values, self.coder, num_splits=30) + + def check_read_with_initial_splits(self, values, coder, num_splits): + """A test that splits the given source into `num_splits` and verifies that + the data read from original source is equal to the union of the data read + from the split sources. + """ + source = Create._create_source_from_iterable(values, coder) + desired_bundle_size = source._total_size / num_splits + splits = source.split(desired_bundle_size) + splits_info = [ + (split.source, split.start_position, split.stop_position) + for split in splits] + source_test_utils.assertSourcesEqualReferenceSource((source, None, None), + splits_info) + + def test_create_source_read_reentrant(self): + source = Create._create_source_from_iterable(range(9), self.coder) + source_test_utils.assertReentrantReadsSucceed((source, None, None)) + + def test_create_source_read_reentrant_with_initial_splits(self): + source = Create._create_source_from_iterable(range(24), self.coder) + for split in source.split(desired_bundle_size=5): + source_test_utils.assertReentrantReadsSucceed((split.source, + split.start_position, + split.stop_position)) + + def test_create_source_dynamic_splitting(self): + # 2 values + source = Create._create_source_from_iterable(range(2), self.coder) + source_test_utils.assertSplitAtFractionExhaustive(source) + # Multiple values. + source = Create._create_source_from_iterable(range(11), self.coder) + source_test_utils.assertSplitAtFractionExhaustive( + source, perform_multi_threaded_test=True) + + def test_create_source_progress(self): + num_values = 10 + source = Create._create_source_from_iterable(range(num_values), self.coder) + splits = [split for split in source.split(desired_bundle_size=100)] + assert len(splits) == 1 + fraction_consumed_report = [] + split_points_report = [] + range_tracker = splits[0].source.get_range_tracker( + splits[0].start_position, splits[0].stop_position) + for _ in splits[0].source.read(range_tracker): + fraction_consumed_report.append(range_tracker.fraction_consumed()) + split_points_report.append(range_tracker.split_points()) + + self.assertEqual( + [float(i) / num_values for i in range(num_values)], + fraction_consumed_report) + + expected_split_points_report = [ + ((i - 1), num_values - (i - 1)) + for i in range(1, num_values + 1)] + + self.assertEqual( + expected_split_points_report, split_points_report) http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 5889ab5..cb1dd77 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -58,7 +58,7 @@ class PTransformTest(unittest.TestCase): pa = TestPipeline() res = pa | 'ALabel' >> beam.Create([1, 2]) - self.assertEqual('AppliedPTransform(ALabel, Create)', + self.assertEqual('AppliedPTransform(ALabel/Read, Read)', str(res.producer)) pc = TestPipeline() @@ -66,7 +66,7 @@ class PTransformTest(unittest.TestCase): inputs_tr = res.producer.transform inputs_tr.inputs = ('ci',) self.assertEqual( - """<Create(PTransform) label=[Create] inputs=('ci',)>""", + """<Read(PTransform) label=[Read] inputs=('ci',)>""", str(inputs_tr)) pd = TestPipeline() @@ -74,12 +74,12 @@ class PTransformTest(unittest.TestCase): side_tr = res.producer.transform side_tr.side_inputs = (4,) self.assertEqual( - '<Create(PTransform) label=[Create] side_inputs=(4,)>', + '<Read(PTransform) label=[Read] side_inputs=(4,)>', str(side_tr)) inputs_tr.side_inputs = ('cs',) self.assertEqual( - """<Create(PTransform) label=[Create] """ + """<Read(PTransform) label=[Read] """ """inputs=('ci',) side_inputs=('cs',)>""", str(inputs_tr)) @@ -689,7 +689,7 @@ class PTransformLabelsTest(unittest.TestCase): def check_label(self, ptransform, expected_label): pipeline = TestPipeline() pipeline | 'Start' >> beam.Create([('a', 1)]) | ptransform - actual_label = sorted(pipeline.applied_labels - {'Start'})[0] + actual_label = sorted(pipeline.applied_labels - {'Start', 'Start/Read'})[0] self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label)) def test_default_labels(self): @@ -707,7 +707,7 @@ class PTransformLabelsTest(unittest.TestCase): self.check_label(beam.ParDo(MyDoFn()), r'ParDo(MyDoFn)') - def test_lable_propogation(self): + def test_label_propogation(self): self.check_label('TestMap' >> beam.Map(len), r'TestMap') self.check_label('TestLambda' >> beam.Map(lambda x: x), r'TestLambda') self.check_label('TestFlatMap' >> beam.FlatMap(list), r'TestFlatMap') @@ -1058,7 +1058,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # aliased to Tuple[int, str]. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | (beam.Create(range(5)) + | (beam.Create([[1], [2]]) .with_output_types(typehints.Iterable[int])) | 'T' >> beam.GroupByKey())