Support ValidatesRunner Attribute in Python This is roughly equivalent to "RunnableOnService" in the Java SDK. See BEAM-655
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81e7a0f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81e7a0f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81e7a0f6 Branch: refs/heads/python-sdk Commit: 81e7a0f653864212a5c9d3d0802608f92bb34501 Parents: 5ce75a2 Author: Mark Liu <[email protected]> Authored: Thu Nov 17 14:45:42 2016 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Nov 29 15:43:04 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/dataflow_test.py | 66 +++++++++++++++-------- sdks/python/apache_beam/test_pipeline.py | 76 +++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81e7a0f6/sdks/python/apache_beam/dataflow_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py index f96e8af..ba3553a 100644 --- a/sdks/python/apache_beam/dataflow_test.py +++ b/sdks/python/apache_beam/dataflow_test.py @@ -24,13 +24,13 @@ import re import unittest import apache_beam as beam -from apache_beam.pipeline import Pipeline from apache_beam.pvalue import AsDict from apache_beam.pvalue import AsIter as AllOf from apache_beam.pvalue import AsList from apache_beam.pvalue import AsSingleton from apache_beam.pvalue import EmptySideInput from apache_beam.pvalue import SideOutputValue +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import Create from apache_beam.transforms import DoFn from apache_beam.transforms import FlatMap @@ -42,6 +42,7 @@ from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to from apache_beam.transforms.window import IntervalWindow from apache_beam.transforms.window import WindowFn +from nose.plugins.attrib import attr class DataflowTest(unittest.TestCase): @@ -58,8 +59,9 @@ class DataflowTest(unittest.TestCase): | 'GroupCounts' >> GroupByKey() | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones)))) + @attr('ValidatesRunner') def test_word_count(self): - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA) result = ( (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x))) @@ -67,8 +69,9 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT)) pipeline.run() + @attr('ValidatesRunner') def test_map(self): - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() lines = pipeline | 'input' >> Create(['a', 'b', 'c']) result = (lines | 'upper' >> Map(str.upper) @@ -76,8 +79,9 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C'])) pipeline.run() + @attr('ValidatesRunner') def test_par_do_with_side_input_as_arg(self): - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() words_list = ['aa', 'bb', 'cc'] words = pipeline | 'SomeWords' >> Create(words_list) prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in @@ -89,8 +93,9 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list])) pipeline.run() + @attr('ValidatesRunner') def test_par_do_with_side_input_as_keyword_arg(self): - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() words_list = ['aa', 'bb', 'cc'] words = pipeline | 'SomeWords' >> Create(words_list) prefix = 'zyx' @@ -102,6 +107,7 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) pipeline.run() + @attr('ValidatesRunner') def test_par_do_with_do_fn_object(self): class SomeDoFn(DoFn): """A custom DoFn for a FlatMap transform.""" @@ -109,7 +115,7 @@ class DataflowTest(unittest.TestCase): def process(self, context, prefix, suffix): return ['%s-%s-%s' % (prefix, context.element, suffix)] - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() words_list = ['aa', 'bb', 'cc'] words = pipeline | 'SomeWords' >> Create(words_list) prefix = 'zyx' @@ -119,6 +125,7 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) pipeline.run() + @attr('ValidatesRunner') def test_par_do_with_multiple_outputs_and_using_yield(self): class SomeDoFn(DoFn): """A custom DoFn using yield.""" @@ -130,7 +137,7 @@ class DataflowTest(unittest.TestCase): else: yield SideOutputValue('odd', context.element) - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) results = nums | ParDo( 'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main') @@ -139,6 +146,7 @@ class DataflowTest(unittest.TestCase): assert_that(results.even, equal_to([2, 4]), label='assert:even') pipeline.run() + @attr('ValidatesRunner') def test_par_do_with_multiple_outputs_and_using_return(self): def some_fn(v): if v % 2 == 0: @@ -146,7 +154,7 @@ class DataflowTest(unittest.TestCase): else: return [v, SideOutputValue('odd', v)] - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) results = nums | FlatMap( 'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main') @@ -155,8 +163,9 @@ class DataflowTest(unittest.TestCase): assert_that(results.even, equal_to([2, 4]), label='assert:even') pipeline.run() + @attr('ValidatesRunner') def test_empty_singleton_side_input(self): - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() pcol = pipeline | 'start' >> Create([1, 2]) side = pipeline | 'side' >> Create([]) # Empty side input. @@ -167,24 +176,27 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to([(1, 'empty'), (2, 'empty')])) pipeline.run() + @attr('ValidatesRunner') def test_multi_valued_singleton_side_input(self): - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() pcol = pipeline | 'start' >> Create([1, 2]) side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input. pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned with self.assertRaises(ValueError): pipeline.run() + @attr('ValidatesRunner') def test_default_value_singleton_side_input(self): - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() pcol = pipeline | 'start' >> Create([1, 2]) side = pipeline | 'side' >> Create([]) # 0 values in side input. result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)) assert_that(result, equal_to([10, 20])) pipeline.run() + @attr('ValidatesRunner') def test_iterable_side_input(self): - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() pcol = pipeline | 'start' >> Create([1, 2]) side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input. result = pcol | FlatMap('compute', @@ -192,8 +204,9 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to([3, 4, 6, 8])) pipeline.run() + @attr('ValidatesRunner') def test_undeclared_side_outputs(self): - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) results = nums | FlatMap( 'ClassifyNumbers', @@ -204,8 +217,9 @@ class DataflowTest(unittest.TestCase): assert_that(results.even, equal_to([2, 4]), label='assert:even') pipeline.run() + @attr('ValidatesRunner') def test_empty_side_outputs(self): - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> Create([1, 3, 5]) results = nums | FlatMap( 'ClassifyNumbers', @@ -216,10 +230,11 @@ class DataflowTest(unittest.TestCase): assert_that(results.even, equal_to([]), label='assert:even') pipeline.run() + @attr('ValidatesRunner') def test_as_list_and_as_dict_side_inputs(self): a_list = [5, 1, 3, 2, 9] some_pairs = [('crouton', 17), ('supreme', None)] - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() main_input = pipeline | 'main input' >> Create([1]) side_list = pipeline | 'side list' >> Create(a_list) side_pairs = pipeline | 'side pairs' >> Create(some_pairs) @@ -239,11 +254,12 @@ class DataflowTest(unittest.TestCase): assert_that(results, matcher(1, a_list, some_pairs)) pipeline.run() + @attr('ValidatesRunner') def test_as_singleton_without_unique_labels(self): # This should succeed as calling AsSingleton on the same PCollection twice # with the same defaults will return the same PCollectionView. a_list = [2] - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() main_input = pipeline | 'main input' >> Create([1]) side_list = pipeline | 'side list' >> Create(a_list) results = main_input | FlatMap( @@ -262,11 +278,12 @@ class DataflowTest(unittest.TestCase): assert_that(results, matcher(1, 2)) pipeline.run() + @attr('ValidatesRunner') def test_as_singleton_with_different_defaults_without_unique_labels(self): # This should fail as AsSingleton with distinct default values should create # distinct PCollectionViews with the same full_label. a_list = [2] - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() main_input = pipeline | 'main input' >> Create([1]) side_list = pipeline | 'side list' >> Create(a_list) @@ -280,9 +297,10 @@ class DataflowTest(unittest.TestCase): 'Transform "ViewAsSingleton(side list.None)" does not have a ' 'stable unique label.')) + @attr('ValidatesRunner') def test_as_singleton_with_different_defaults_with_unique_labels(self): a_list = [] - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() main_input = pipeline | 'main input' >> Create([1]) side_list = pipeline | 'side list' >> Create(a_list) results = main_input | FlatMap( @@ -302,11 +320,12 @@ class DataflowTest(unittest.TestCase): assert_that(results, matcher(1, 2, 3)) pipeline.run() + @attr('ValidatesRunner') def test_as_list_without_unique_labels(self): # This should succeed as calling AsList on the same PCollection twice will # return the same PCollectionView. a_list = [1, 2, 3] - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() main_input = pipeline | 'main input' >> Create([1]) side_list = pipeline | 'side list' >> Create(a_list) results = main_input | FlatMap( @@ -325,9 +344,10 @@ class DataflowTest(unittest.TestCase): assert_that(results, matcher(1, [1, 2, 3])) pipeline.run() + @attr('ValidatesRunner') def test_as_list_with_unique_labels(self): a_list = [1, 2, 3] - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() main_input = pipeline | 'main input' >> Create([1]) side_list = pipeline | 'side list' >> Create(a_list) results = main_input | FlatMap( @@ -346,9 +366,10 @@ class DataflowTest(unittest.TestCase): assert_that(results, matcher(1, [1, 2, 3])) pipeline.run() + @attr('ValidatesRunner') def test_as_dict_with_unique_labels(self): some_kvs = [('a', 1), ('b', 2)] - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() main_input = pipeline | 'main input' >> Create([1]) side_kvs = pipeline | 'side kvs' >> Create(some_kvs) results = main_input | FlatMap( @@ -367,6 +388,7 @@ class DataflowTest(unittest.TestCase): assert_that(results, matcher(1, some_kvs)) pipeline.run() + @attr('ValidatesRunner') def test_window_transform(self): class TestWindowFn(WindowFn): """Windowing function adding two disjoint windows to each element.""" @@ -378,7 +400,7 @@ class DataflowTest(unittest.TestCase): def merge(self, existing_windows): return existing_windows - pipeline = Pipeline('DirectPipelineRunner') + pipeline = TestPipeline() numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)]) result = (numbers | 'W' >> WindowInto(windowfn=TestWindowFn()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81e7a0f6/sdks/python/apache_beam/test_pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py new file mode 100644 index 0000000..be64a7a --- /dev/null +++ b/sdks/python/apache_beam/test_pipeline.py @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Test Pipeline, a wrapper of Pipeline for test purpose""" + +import argparse +import shlex + +from apache_beam.pipeline import Pipeline +from apache_beam.utils.options import PipelineOptions + + +class TestPipeline(Pipeline): + """TestPipeline class is used inside of Beam tests that can be configured to + run against pipeline runner. + + It has a functionality to parse arguments from command line and build pipeline + options for tests who runs against a pipeline runner and utilizes resources + of the pipeline runner. Those test functions are recommended to be tagged by + @attr("ValidatesRunner") annotation. + + In order to configure the test with customized pipeline options from command + line, system argument 'test-pipeline-options' can be used to obtains a list + of pipeline options. If no options specified, default value will be used. + + For example, use following command line to execute all ValidatesRunner tests:: + + python setup.py nosetests -a ValidatesRunner \ + --test-pipeline-options="--runner=DirectPipelineRunner \ + --job_name=myJobName \ + --num_workers=1" + + For example, use assert_that for test validation:: + + pipeline = TestPipeline() + pcoll = ... + assert_that(pcoll, equal_to(...)) + pipeline.run() + """ + + def __init__(self, runner=None, options=None, argv=None): + if options is None: + options = self.create_pipeline_opt_from_args() + super(TestPipeline, self).__init__(runner, options, argv) + + def create_pipeline_opt_from_args(self): + """Create a pipeline options from command line argument: + --test-pipeline-options + """ + parser = argparse.ArgumentParser() + parser.add_argument('--test-pipeline-options', + type=str, + action='store', + help='only run tests providing service options') + known, unused_argv = parser.parse_known_args() + + if known.test_pipeline_options: + options = shlex.split(known.test_pipeline_options) + else: + options = [] + + return PipelineOptions(options)
