Repository: incubator-beam Updated Branches: refs/heads/python-sdk c8cef2cba -> a1a51c3c1
Update some of the example tests to use assert_that Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84fef464 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84fef464 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84fef464 Branch: refs/heads/python-sdk Commit: 84fef464b0abea41e318c0fe983ac43874e5f6ad Parents: c8cef2c Author: Ahmet Altay <al...@google.com> Authored: Wed Jul 13 16:28:46 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Thu Jul 14 17:37:02 2016 -0700 ---------------------------------------------------------------------- .../examples/complete/autocomplete_test.py | 34 ++------- .../examples/complete/estimate_pi.py | 19 +++-- .../examples/complete/estimate_pi_test.py | 36 +++++----- .../examples/cookbook/coders_test.py | 33 +++------ .../examples/cookbook/custom_ptransform.py | 74 ++++++++++---------- .../examples/cookbook/custom_ptransform_test.py | 41 ++++------- .../examples/cookbook/group_with_coder_test.py | 70 ++++++++---------- sdks/python/apache_beam/transforms/util.py | 13 ++++ 8 files changed, 139 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/complete/autocomplete_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index bd0a6cb..1b3ee5f 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -15,43 +15,17 @@ # limitations under the License. # -"""Test for the wordcount example.""" +"""Test for the autocomplete example.""" -import collections import unittest - import apache_beam as beam from apache_beam.examples.complete import autocomplete -from apache_beam.pvalue import AsIter - -# TODO(robertwb): Move to testing utilities. - - -def assert_that(pcoll, matcher): - """Asserts that the give PCollection satisfies the constraints of the matcher - in a way that is runnable locally or on a remote service. - """ - singleton = pcoll.pipeline | beam.Create('create_singleton', [None]) - - def check_matcher(_, side_value): - assert matcher(side_value) - return [] - singleton | beam.FlatMap(check_matcher, AsIter(pcoll)) # pylint: disable=expression-not-assigned - - -def contains_in_any_order(expected): - def matcher(value): - vs = collections.Counter(value) - es = collections.Counter(expected) - if vs != es: - raise ValueError( - 'extra: %s, missing: %s' % (vs - es, es - vs)) - return True - return matcher +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import contains_in_any_order -class WordCountTest(unittest.TestCase): +class AutocompleteTest(unittest.TestCase): WORDS = ['this', 'this', 'that', 'to', 'to', 'to'] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/complete/estimate_pi.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index 8b0f202..3c4a2d9 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -85,6 +85,20 @@ class JsonCoder(object): return json.dumps(x) +class EstimatePiTransform(beam.PTransform): + """Runs 10M trials, and combine the results to estimate pi.""" + + def __init__(self, label): + super(EstimatePiTransform, self).__init__(label) + + def apply(self, pcoll): + # A hundred work items of a hundred thousand tries each. + return (pcoll + | beam.Create('Initialize', [100000] * 100).with_output_types(int) + | beam.Map('Run trials', run_trials) + | beam.CombineGlobally('Sum', combine_results).without_defaults()) + + def run(argv=None): parser = argparse.ArgumentParser() @@ -94,11 +108,8 @@ def run(argv=None): known_args, pipeline_args = parser.parse_known_args(argv) p = beam.Pipeline(argv=pipeline_args) - # A thousand work items of a million tries each. (p # pylint: disable=expression-not-assigned - | beam.Create('Initialize', [100000] * 100).with_output_types(int) - | beam.Map('Run trials', run_trials) - | beam.CombineGlobally('Sum', combine_results).without_defaults() + | EstimatePiTransform('Estimate') | beam.io.Write('Write', beam.io.TextFileSink(known_args.output, coder=JsonCoder()))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/complete/estimate_pi_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py index ebebadc..7ca82d7 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py @@ -17,31 +17,33 @@ """Test for the estimate_pi example.""" -import json -import logging -import tempfile import unittest +import apache_beam as beam from apache_beam.examples.complete import estimate_pi +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import DataflowAssertException -class EstimatePiTest(unittest.TestCase): +def in_between(lower, upper): + def _in_between(actual): + _, _, estimate = actual[0] + if estimate < lower or estimate > upper: + raise DataflowAssertException( + 'Failed assert: %f not in [%f, %f]' % (estimate, lower, upper)) + return _in_between + - def create_temp_file(self, contents): - with tempfile.NamedTemporaryFile(delete=False) as f: - f.write(contents) - return f.name +class EstimatePiTest(unittest.TestCase): def test_basics(self): - temp_path = self.create_temp_file('result') - estimate_pi.run([ - '--output=%s' % temp_path]) - # Parse result file and compare. - with open(temp_path + '-00000-of-00001') as result_file: - estimated_pi = json.loads(result_file.readline())[2] - # Note: Probabilistically speaking this test can fail with a probability - # that is very small (VERY) given that we run at least 10 million trials. - self.assertTrue(estimated_pi > 3.13 and estimated_pi < 3.15) + p = beam.Pipeline('DirectPipelineRunner') + result = p | estimate_pi.EstimatePiTransform('Estimate') + + # Note: Probabilistically speaking this test can fail with a probability + # that is very small (VERY) given that we run at least 10 million trials. + assert_that(result, in_between(3.13, 3.15)) + p.run() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/cookbook/coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py index 904a967..5840081 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders_test.py +++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py @@ -17,12 +17,13 @@ """Test for the coders example.""" -import json import logging -import tempfile import unittest +import apache_beam as beam from apache_beam.examples.cookbook import coders +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import equal_to class CodersTest(unittest.TestCase): @@ -32,26 +33,14 @@ class CodersTest(unittest.TestCase): {'host': ['Germany', 1], 'guest': ['Brasil', 3]}, {'host': ['Brasil', 1], 'guest': ['Italy', 0]}] - def create_temp_file(self, records): - with tempfile.NamedTemporaryFile(delete=False) as f: - for record in records: - f.write('%s\n' % json.dumps(record)) - return f.name - - def test_basics(self): - temp_path = self.create_temp_file(self.SAMPLE_RECORDS) - coders.run([ - '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]) - # Parse result file and compare. - results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: - for line in result_file: - results.append(json.loads(line)) - logging.info('result: %s', results) - self.assertEqual( - sorted(results), - sorted([['Italy', 0], ['Brasil', 6], ['Germany', 3]])) + def test_compute_points(self): + p = beam.Pipeline('DirectPipelineRunner') + records = p | beam.Create('create', self.SAMPLE_RECORDS) + result = (records + | beam.FlatMap('points', coders.compute_points) + | beam.CombinePerKey(sum)) + assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)])) + p.run() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index 8da1f43..d3d8b08 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -33,69 +33,67 @@ from apache_beam.utils.options import PipelineOptions # pylint doesn't understand our pipeline syntax: # pylint:disable=expression-not-assigned +class Count1(beam.PTransform): + """Count as a subclass of PTransform, with an apply method.""" -def run_count1(known_args, options): - """Runs the first example pipeline.""" - - class Count(beam.PTransform): - """Count as a subclass of PTransform, with an apply method.""" + def apply(self, pcoll): + return ( + pcoll + | beam.Map('Init', lambda v: (v, 1)) + | beam.CombinePerKey(sum)) - def apply(self, pcoll): - return ( - pcoll - | beam.Map('Init', lambda v: (v, 1)) - | beam.CombinePerKey(sum)) +def run_count1(known_args, options): + """Runs the first example pipeline.""" logging.info('Running first pipeline') p = beam.Pipeline(options=options) - (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) | Count() + (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) | Count1() | beam.io.Write(beam.io.TextFileSink(known_args.output))) p.run() -def run_count2(known_args, options): - """Runs the second example pipeline.""" +@beam.ptransform_fn +def Count2(pcoll): # pylint: disable=invalid-name + """Count as a decorated function.""" + return ( + pcoll + | beam.Map('Init', lambda v: (v, 1)) + | beam.CombinePerKey(sum)) - @beam.ptransform_fn - def Count(pcoll): # pylint: disable=invalid-name - """Count as a decorated function.""" - return ( - pcoll - | beam.Map('Init', lambda v: (v, 1)) - | beam.CombinePerKey(sum)) +def run_count2(known_args, options): + """Runs the second example pipeline.""" logging.info('Running second pipeline') p = beam.Pipeline(options=options) (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) - | Count() # pylint: disable=no-value-for-parameter + | Count2() # pylint: disable=no-value-for-parameter | beam.io.Write(beam.io.TextFileSink(known_args.output))) p.run() -def run_count3(known_args, options): - """Runs the third example pipeline.""" +@beam.ptransform_fn +def Count3(pcoll, factor=1): # pylint: disable=invalid-name + """Count as a decorated function with a side input. - @beam.ptransform_fn - # pylint: disable=invalid-name - def Count(pcoll, factor=1): - """Count as a decorated function with a side input. + Args: + pcoll: the PCollection passed in from the previous transform + factor: the amount by which to count - Args: - pcoll: the PCollection passed in from the previous transform - factor: the amount by which to count + Returns: + A PCollection counting the number of times each unique element occurs. + """ + return ( + pcoll + | beam.Map('Init', lambda v: (v, factor)) + | beam.CombinePerKey(sum)) - Returns: - A PCollection counting the number of times each unique element occurs. - """ - return ( - pcoll - | beam.Map('Init', lambda v: (v, factor)) - | beam.CombinePerKey(sum)) +def run_count3(known_args, options): + """Runs the third example pipeline.""" logging.info('Running third pipeline') p = beam.Pipeline(options=options) (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) - | Count(2) # pylint: disable=no-value-for-parameter + | Count3(2) # pylint: disable=no-value-for-parameter | beam.io.Write(beam.io.TextFileSink(known_args.output))) p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py index 875a99f..3c0c6f3 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py @@ -18,48 +18,33 @@ """Tests for the various custom Count implementation examples.""" import logging -import tempfile import unittest +import apache_beam as beam from apache_beam.examples.cookbook import custom_ptransform -from apache_beam.utils.options import PipelineOptions +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import equal_to class CustomCountTest(unittest.TestCase): def test_count1(self): - self.run_pipeline(custom_ptransform.run_count1) + self.run_pipeline(custom_ptransform.Count1()) def test_count2(self): - self.run_pipeline(custom_ptransform.run_count2) + self.run_pipeline(custom_ptransform.Count2()) def test_count3(self): - self.run_pipeline(custom_ptransform.run_count3, factor=2) + factor = 2 + self.run_pipeline(custom_ptransform.Count3(factor), factor=factor) def run_pipeline(self, count_implementation, factor=1): - input_path = self.create_temp_file('CAT\nDOG\nCAT\nCAT\nDOG\n') - output_path = input_path + '.result' - - known_args, pipeline_args = custom_ptransform.get_args([ - '--input=%s*' % input_path, '--output=%s' % output_path - ]) - - count_implementation(known_args, PipelineOptions(pipeline_args)) - self.assertEqual(["(u'CAT', %d)" % (3 * factor), - "(u'DOG', %d)" % (2 * factor)], - self.get_output(output_path + '-00000-of-00001')) - - def create_temp_file(self, contents=''): - with tempfile.NamedTemporaryFile(delete=False) as f: - f.write(contents) - return f.name - - def get_output(self, path): - logging.info('Reading output from "%s"', path) - lines = [] - with open(path) as f: - lines = f.readlines() - return sorted(s.rstrip('\n') for s in lines) + p = beam.Pipeline('DirectPipelineRunner') + words = p | beam.Create('create', ['CAT', 'DOG', 'CAT', 'CAT', 'DOG']) + result = words | count_implementation + assert_that( + result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))])) + p.run() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py index fb52809..07211a9 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py @@ -18,10 +18,13 @@ """Test for the custom coders example.""" import logging -import tempfile import unittest +import apache_beam as beam from apache_beam.examples.cookbook import group_with_coder +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import equal_to +from apache_beam.utils.options import OptionsContext # Patch group_with_coder.PlayerCoder.decode(). To test that the PlayerCoder was @@ -36,54 +39,37 @@ class GroupWithCoderTest(unittest.TestCase): 'joe,20', 'fred,6', 'ann,5', 'joe,30', 'ann,10', 'mary,1'] - def create_temp_file(self, records): - with tempfile.NamedTemporaryFile(delete=False) as f: - for record in records: - f.write('%s\n' % record) - return f.name - - def test_basics_with_type_check(self): - # Run the workflow with --pipeline_type_check option. This will make sure + @OptionsContext(pipeline_type_check=True) + def test_basics_with_type_check_n(self): + # Run the workflow with pipeline_type_check option. This will make sure # the typehints associated with all transforms will have non-default values # and therefore any custom coders will be used. In our case we want to make # sure the coder for the Player class will be used. - temp_path = self.create_temp_file(self.SAMPLE_RECORDS) - group_with_coder.run([ - '--pipeline_type_check', - '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]) - # Parse result file and compare. - results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: - for line in result_file: - name, points = line.split(',') - results.append((name, int(points))) - logging.info('result: %s', results) - self.assertEqual( - sorted(results), - sorted([('x:ann', 15), ('x:fred', 9), ('x:joe', 60), ('x:mary', 8)])) + p = beam.Pipeline('DirectPipelineRunner') + data = p | beam.Create('create', self.SAMPLE_RECORDS) + result = (data + | beam.Map('get players', group_with_coder.get_players) + | beam.CombinePerKey(sum) + | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))) + assert_that(result, equal_to( + ['x:ann,15', 'x:fred,9', 'x:joe,60', 'x:mary,8'])) + p.run() - def test_basics_without_type_check(self): - # Run the workflow without --pipeline_type_check option. This will make sure + @OptionsContext(pipeline_type_check=False) + def test_basics_without_type_check_n(self): + # Run the workflow without pipeline_type_check option. This will make sure # the typehints associated with all transforms will have default values and # therefore any custom coders will not be used. The default coder (pickler) # will be used instead. - temp_path = self.create_temp_file(self.SAMPLE_RECORDS) - group_with_coder.run([ - '--no_pipeline_type_check', - '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]) - # Parse result file and compare. - results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: - for line in result_file: - name, points = line.split(',') - results.append((name, int(points))) - logging.info('result: %s', results) - self.assertEqual( - sorted(results), - sorted([('ann', 15), ('fred', 9), ('joe', 60), ('mary', 8)])) - + p = beam.Pipeline('DirectPipelineRunner') + data = p | beam.Create('create', self.SAMPLE_RECORDS) + result = (data + | beam.Map('get players', group_with_coder.get_players) + | beam.CombinePerKey(sum) + | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))) + assert_that(result, equal_to( + ['ann,15', 'fred,9', 'joe,60', 'mary,8'])) + p.run() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/transforms/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index b7a121d..59f4338 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -20,6 +20,8 @@ from __future__ import absolute_import +import collections + from apache_beam.pvalue import AsIter as AllOf from apache_beam.transforms.core import CombinePerKey, Create, Flatten, GroupByKey, Map from apache_beam.transforms.ptransform import PTransform @@ -35,6 +37,7 @@ __all__ = [ 'assert_that', 'equal_to', 'is_empty', + 'contains_in_any_order', ] @@ -196,6 +199,16 @@ def is_empty(): return _empty +def contains_in_any_order(expected): + def _contains_in_any_order(actual): + vs = collections.Counter(actual) + es = collections.Counter(expected) + if vs != es: + raise DataflowAssertException( + 'Failed assert: extra: %s, missing: %s' % (vs - es, es - vs)) + return _contains_in_any_order + + def assert_that(actual, matcher, label='assert_that'): """A PTransform that checks a PCollection has an expected value.