Repository: incubator-beam Updated Branches: refs/heads/python-sdk 95a591e05 -> 67a769a9a
Made checksum_output optional in bigshuffle.py. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b782ded Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b782ded Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b782ded Branch: refs/heads/python-sdk Commit: 2b782deddefc4ddf437cc4623ccb461332f0fe20 Parents: 95a591e Author: Marian Dvorsky <mari...@google.com> Authored: Mon Jul 11 11:47:01 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Jul 12 14:26:26 2016 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/cookbook/bigshuffle.py | 35 ++++++++++---------- 1 file changed, 18 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b782ded/sdks/python/apache_beam/examples/cookbook/bigshuffle.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py index 8cbaa40..692bd52 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py +++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py @@ -42,7 +42,6 @@ def run(argv=None): required=True, help='Output file pattern to write results to.') parser.add_argument('--checksum_output', - required=True, help='Checksum output file pattern.') known_args, pipeline_args = parser.parse_known_args(argv) @@ -59,24 +58,26 @@ def run(argv=None): 'format', lambda (key, vals): ['%s%s' % (key, val) for val in vals])) - input_csum = (lines - | beam.Map('input-csum', crc32line) - | beam.CombineGlobally('combine-input-csum', sum) - | beam.Map('hex-format', lambda x: '%x' % x)) - input_csum | beam.io.Write( - 'write-input-csum', - beam.io.TextFileSink(known_args.checksum_output + '-input')) - # Write the output using a "Write" transform that has side effects. output | beam.io.Write('write', beam.io.TextFileSink(known_args.output)) - # Write the output checksum - output_csum = (output - | beam.Map('output-csum', crc32line) - | beam.CombineGlobally('combine-output-csum', sum) - | beam.Map('hex-format-output', lambda x: '%x' % x)) - output_csum | beam.io.Write( - 'write-output-csum', - beam.io.TextFileSink(known_args.checksum_output + '-output')) + + # Optionally write the input and output checksums. + if known_args.checksum_output: + input_csum = (lines + | beam.Map('input-csum', crc32line) + | beam.CombineGlobally('combine-input-csum', sum) + | beam.Map('hex-format', lambda x: '%x' % x)) + input_csum | beam.io.Write( + 'write-input-csum', + beam.io.TextFileSink(known_args.checksum_output + '-input')) + + output_csum = (output + | beam.Map('output-csum', crc32line) + | beam.CombineGlobally('combine-output-csum', sum) + | beam.Map('hex-format-output', lambda x: '%x' % x)) + output_csum | beam.io.Write( + 'write-output-csum', + beam.io.TextFileSink(known_args.checksum_output + '-output')) # Actually run the pipeline (all operations above are deferred). p.run()