Repository: incubator-beam Updated Branches: refs/heads/python-sdk b6bb97f57 -> 4006f7779
Adds more code snippets. Adds code snippets for sources, sinks, and joining using side inputs. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22ff002f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22ff002f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22ff002f Branch: refs/heads/python-sdk Commit: 22ff002f385141c9e7d98a04f2edb817f43623a0 Parents: b6bb97f Author: Chamikara Jayalath <chamik...@apache.org> Authored: Sat Jun 25 01:53:12 2016 -0700 Committer: Chamikara Jayalath <chamik...@google.com> Committed: Mon Jul 11 22:49:11 2016 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/__init__.py | 16 ++ .../apache_beam/examples/snippets/snippets.py | 195 +++++++++++++++++++ .../examples/snippets/snippets_test.py | 73 +++++++ 3 files changed, 284 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22ff002f/sdks/python/apache_beam/examples/snippets/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/__init__.py b/sdks/python/apache_beam/examples/snippets/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22ff002f/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index d84deea..57ddbee 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -609,6 +609,162 @@ def examples_wordcount_debugging(renames): p.run() +def model_custom_source(count): + """Demonstrates creating a new custom source and using it in a pipeline. + + Defines a new source 'CountingSource' that produces integers starting from 0 + up to a given size. + + Uses the new source in an example pipeline. + + Args: + count: the size of the counting source to be used in the pipeline + demonstrated in this method. + """ + + import apache_beam as beam + from apache_beam.io import iobase + from apache_beam.io.range_trackers import OffsetRangeTracker + from apache_beam.utils.options import PipelineOptions + + # Defining a new source. + # [START model_custom_source_new_source] + class CountingSource(iobase.BoundedSource): + + def __init__(self, count): + self._count = count + + def estimate_size(self): + return self._count + + def get_range_tracker(self, start_position, stop_position): + if start_position is None: + start_position = 0 + if stop_position is None: + stop_position = self._count + + return OffsetRangeTracker(start_position, stop_position) + + def read(self, range_tracker): + for i in range(self._count): + if not range_tracker.try_claim(i): + return + yield i + + def split(self, desired_bundle_size, start_position=None, + stop_position=None): + if start_position is None: + start_position = 0 + if stop_position is None: + stop_position = self._count + + bundle_start = start_position + while bundle_start < self._count: + bundle_stop = max(self._count, bundle_start + desired_bundle_size) + yield iobase.SourceBundle(weight=(bundle_stop - bundle_start), + source=self, + start_position=bundle_start, + stop_position=bundle_stop) + bundle_start = bundle_stop + # [END model_custom_source_new_source] + + # Using the source in an example pipeline. + # [START model_custom_source_use_new_source] + p = beam.Pipeline(options=PipelineOptions()) + numbers = p | beam.io.Read('ProduceNumbers', CountingSource(count)) + # [END model_custom_source_use_new_source] + + lines = numbers | beam.core.Map(lambda number: 'line %d' % number) + beam.assert_that( + lines, beam.equal_to( + ['line ' + str(number) for number in range(0, count)])) + + p.run() + + +def model_custom_sink(simplekv, KVs, final_table_name): + """Demonstrates creating a new custom sink and using it in a pipeline. + + Defines a new sink 'SimpleKVSink' that demonstrates writing to a simple + key-value based storage system. + + Uses the new sink in an example pipeline. + + Args: + simplekv: an object that mocks the key-value storage. The API of the + key-value storage consists of following methods. + simplekv.connect(url) - + connects to the storage and returns an access token + which can be used to perform further operations + simplekv.open_table(access_token, table_name) - + creates a table named 'table_name'. Returns a table object. + simplekv.write_to_table(table, access_token, key, value) - + writes a key-value pair to the given table. + simplekv.rename_table(access_token, old_name, new_name) - + renames the table named 'old_name' to 'new_name'. + KVs: the set of key-value pairs to be written in the example pipeline. + final_table_name: the prefix of final set of tables to be created by the + example pipeline. + """ + + import apache_beam as beam + from apache_beam.io import iobase + from apache_beam.utils.options import PipelineOptions + + # Defining the new sink. + # [START model_custom_sink_new_sink] + class SimpleKVSink(iobase.Sink): + + def __init__(self, url, final_table_name): + self._url = url + self._final_table_name = final_table_name + + def initialize_write(self): + access_token = simplekv.connect(self._url) + return access_token + + def open_writer(self, access_token, uid): + table_name = 'table' + uid + return SimpleKVWriter(access_token, table_name) + + def finalize_write(self, access_token, table_names): + for i, table_name in enumerate(table_names): + simplekv.rename_table( + access_token, table_name, self._final_table_name + str(i)) + # [END model_custom_sink_new_sink] + + # Defining a writer for the new sink. + # [START model_custom_sink_new_writer] + class SimpleKVWriter(iobase.Writer): + + def __init__(self, access_token, table_name): + self._access_token = access_token + self._table_name = table_name + self._table = simplekv.open_table(access_token, table_name) + + def write(self, record): + key, value = record + + simplekv.write_to_table(self._access_token, self._table, key, value) + + def close(self): + return self._table_name + # [END model_custom_sink_new_writer] + + # Using the new sink in an example pipeline. + # [START model_custom_sink_use_new_sink] + p = beam.Pipeline(options=PipelineOptions()) + kvs = p | beam.core.Create( + 'CreateKVs', KVs) + + kvs | beam.io.Write('WriteToSimpleKV', + SimpleKVSink('http://url_to_simple_kv/', + final_table_name)) + # [END model_custom_sink_use_new_sink] + + p.run() + + def model_textio(renames): """Using a Read and Write transform to read/write text files. @@ -859,6 +1015,45 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path): p.run() +def model_join_using_side_inputs( + name_list, email_list, phone_list, output_path): + """Joining PCollections using side inputs.""" + + import apache_beam as beam + from apache_beam.pvalue import AsIter + from apache_beam.utils.options import PipelineOptions + + p = beam.Pipeline(options=PipelineOptions()) + # [START model_join_using_side_inputs] + # This code performs a join by receiving the set of names as an input and + # passing PCollections that contain emails and phone numbers as side inputs + # instead of using CoGroupByKey. + names = p | beam.Create('names', name_list) + emails = p | beam.Create('email', email_list) + phones = p | beam.Create('phone', phone_list) + + def join_info(name, emails, phone_numbers): + filtered_emails = [] + for name_in_list, email in emails: + if name_in_list == name: + filtered_emails.append(email) + + filtered_phone_numbers = [] + for name_in_list, phone_number in phone_numbers: + if name_in_list == name: + filtered_phone_numbers.append(phone_number) + + return '; '.join(['%s' % name, + '%s' % ','.join(filtered_emails), + '%s' % ','.join(filtered_phone_numbers)]) + + contact_lines = names | beam.core.Map( + "CreateContacts", join_info, AsIter(emails), AsIter(phones)) + # [END model_join_using_side_inputs] + contact_lines | beam.io.Write(beam.io.TextFileSink(output_path)) + p.run() + + # [START model_library_transforms_keys] class Keys(beam.PTransform): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22ff002f/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 6e1045f..eaf1b3a 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -17,7 +17,9 @@ """Tests for all code snippets used in public docs.""" +import glob import logging +import os import sys import tempfile import unittest @@ -234,6 +236,8 @@ class TypeHintsTest(unittest.TestCase): # [END type_hints_missing_define_numbers] # Consider the following code. + # pylint: disable=expression-not-assigned + # pylint: disable=unused-variable # [START type_hints_missing_apply] evens = numbers | beam.Filter(lambda x: x % 2 == 0) # [END type_hints_missing_apply] @@ -279,11 +283,13 @@ class TypeHintsTest(unittest.TestCase): words_with_lens = words | MyTransform() # [END type_hints_transform] + # pylint: disable=expression-not-assigned with self.assertRaises(typehints.TypeCheckError): words_with_lens | beam.Map(lambda x: x).with_input_types( beam.typehints.Tuple[int, int]) def test_runtime_checks_off(self): + # pylint: disable=expression-not-assigned p = beam.Pipeline('DirectPipelineRunner', argv=sys.argv) # [START type_hints_runtime_off] p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str) @@ -291,6 +297,7 @@ class TypeHintsTest(unittest.TestCase): # [END type_hints_runtime_off] def test_runtime_checks_on(self): + # pylint: disable=expression-not-assigned p = beam.Pipeline('DirectPipelineRunner', argv=sys.argv) with self.assertRaises(typehints.TypeCheckError): # [START type_hints_runtime_on] @@ -379,6 +386,62 @@ class SnippetsTest(unittest.TestCase): self.get_output(result_path), ['cba', 'fed', 'ihg', 'lkj', 'onm', 'rqp', 'uts', 'xwv', 'zy']) + def test_model_custom_source(self): + snippets.model_custom_source(100) + + def test_model_custom_sink(self): + tempdir_name = tempfile.mkdtemp() + + class SimpleKV(object): + def __init__(self, tmp_dir): + self._dummy_token = 'dummy_token' + self._tmp_dir = tmp_dir + + def connect(self, url): + return self._dummy_token + + def open_table(self, access_token, table_name): + assert access_token == self._dummy_token + file_name = self._tmp_dir + os.sep + table_name + assert not os.path.exists(file_name) + open(file_name, 'wb').close() + return table_name + + def write_to_table(self, access_token, table_name, key, value): + assert access_token == self._dummy_token + file_name = self._tmp_dir + os.sep + table_name + assert os.path.exists(file_name) + with open(file_name, 'ab') as f: + f.write(key + ':' + value + os.linesep) + + def rename_table(self, access_token, old_name, new_name): + assert access_token == self._dummy_token + old_file_name = self._tmp_dir + os.sep + old_name + new_file_name = self._tmp_dir + os.sep + new_name + assert os.path.isfile(old_file_name) + assert not os.path.exists(new_file_name) + + os.rename(old_file_name, new_file_name) + + snippets.model_custom_sink( + SimpleKV(tempdir_name), + [('key' + str(i), 'value' + str(i)) for i in range(100)], + 'final_table') + + glob_pattern = tempdir_name + os.sep + 'final_table*' + output_files = glob.glob(glob_pattern) + assert len(output_files) > 0 + + received_output = [] + for file_name in output_files: + with open(file_name) as f: + for line in f: + received_output.append(line.rstrip(os.linesep)) + expected_output = [ + 'key' + str(i) + ':' + 'value' + str(i) for i in range(100)] + + self.assertItemsEqual(expected_output, received_output) + def test_model_textio(self): temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc') result_path = temp_path + '.result' @@ -480,6 +543,16 @@ class SnippetsTest(unittest.TestCase): expect = ['a; a...@example.com; x4312', 'b; b...@example.com; x8452'] self.assertEqual(expect, self.get_output(result_path)) + def test_model_join_using_side_inputs(self): + name_list = ['a', 'b'] + email_list = [['a', 'a...@example.com'], ['b', 'b...@example.com']] + phone_list = [['a', 'x4312'], ['b', 'x8452']] + result_path = self.create_temp_file() + snippets.model_join_using_side_inputs( + name_list, email_list, phone_list, result_path) + expect = ['a; a...@example.com; x4312', 'b; b...@example.com; x8452'] + self.assertEqual(expect, self.get_output(result_path)) + class CombineTest(unittest.TestCase): """Tests for dataflow/model/combine."""