This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c0b3fda [BEAM-7389] Created elementwise for consistency with docs new 50c5281 Merge pull request #9664 from davidcavazos/normalize-filenames c0b3fda is described below commit c0b3fdaf49d4c9f96da4570259ce4d85e4609422 Author: David Cavazos <dcava...@google.com> AuthorDate: Thu Oct 3 09:31:19 2019 -0700 [BEAM-7389] Created elementwise for consistency with docs --- .../snippets/transforms/elementwise/__init__.py | 18 ++ .../snippets/transforms/elementwise/filter.py | 182 +++++++++++++++ .../snippets/transforms/elementwise/filter_test.py | 81 +++++++ .../snippets/transforms/elementwise/flatmap.py | 248 +++++++++++++++++++++ .../transforms/elementwise/flatmap_test.py | 92 ++++++++ .../snippets/transforms/elementwise/keys.py | 42 ++++ .../snippets/transforms/elementwise/keys_test.py | 56 +++++ .../snippets/transforms/elementwise/kvswap.py | 42 ++++ .../snippets/transforms/elementwise/kvswap_test.py | 56 +++++ .../snippets/transforms/elementwise/map.py | 226 +++++++++++++++++++ .../snippets/transforms/elementwise/map_test.py | 90 ++++++++ .../snippets/transforms/elementwise/pardo.py | 126 +++++++++++ .../snippets/transforms/elementwise/pardo_test.py | 120 ++++++++++ .../snippets/transforms/elementwise/partition.py | 136 +++++++++++ .../transforms/elementwise/partition_test.py | 84 +++++++ .../snippets/transforms/elementwise/regex.py | 236 ++++++++++++++++++++ .../snippets/transforms/elementwise/regex_test.py | 173 ++++++++++++++ .../snippets/transforms/elementwise/tostring.py | 86 +++++++ .../transforms/elementwise/tostring_test.py | 105 +++++++++ .../snippets/transforms/elementwise/values.py | 42 ++++ .../snippets/transforms/elementwise/values_test.py | 56 +++++ .../transforms/elementwise/withtimestamps.py | 128 +++++++++++ .../transforms/elementwise/withtimestamps_test.py | 103 +++++++++ 23 files changed, 2528 insertions(+) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/__init__.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/__init__.py new file mode 100644 index 0000000..6569e3f --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter.py new file mode 100644 index 0000000..44b11b8 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter.py @@ -0,0 +1,182 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def filter_function(test=None): + # [START filter_function] + import apache_beam as beam + + def is_perennial(plant): + return plant['duration'] == 'perennial' + + with beam.Pipeline() as pipeline: + perennials = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ]) + | 'Filter perennials' >> beam.Filter(is_perennial) + | beam.Map(print) + ) + # [END filter_function] + if test: + test(perennials) + + +def filter_lambda(test=None): + # [START filter_lambda] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + perennials = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ]) + | 'Filter perennials' >> beam.Filter( + lambda plant: plant['duration'] == 'perennial') + | beam.Map(print) + ) + # [END filter_lambda] + if test: + test(perennials) + + +def filter_multiple_arguments(test=None): + # [START filter_multiple_arguments] + import apache_beam as beam + + def has_duration(plant, duration): + return plant['duration'] == duration + + with beam.Pipeline() as pipeline: + perennials = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ]) + | 'Filter perennials' >> beam.Filter(has_duration, 'perennial') + | beam.Map(print) + ) + # [END filter_multiple_arguments] + if test: + test(perennials) + + +def filter_side_inputs_singleton(test=None): + # [START filter_side_inputs_singleton] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + perennial = pipeline | 'Perennial' >> beam.Create(['perennial']) + + perennials = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ]) + | 'Filter perennials' >> beam.Filter( + lambda plant, duration: plant['duration'] == duration, + duration=beam.pvalue.AsSingleton(perennial), + ) + | beam.Map(print) + ) + # [END filter_side_inputs_singleton] + if test: + test(perennials) + + +def filter_side_inputs_iter(test=None): + # [START filter_side_inputs_iter] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + valid_durations = pipeline | 'Valid durations' >> beam.Create([ + 'annual', + 'biennial', + 'perennial', + ]) + + valid_plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'PERENNIAL'}, + ]) + | 'Filter valid plants' >> beam.Filter( + lambda plant, valid_durations: plant['duration'] in valid_durations, + valid_durations=beam.pvalue.AsIter(valid_durations), + ) + | beam.Map(print) + ) + # [END filter_side_inputs_iter] + if test: + test(valid_plants) + + +def filter_side_inputs_dict(test=None): + # [START filter_side_inputs_dict] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + keep_duration = pipeline | 'Duration filters' >> beam.Create([ + ('annual', False), + ('biennial', False), + ('perennial', True), + ]) + + perennials = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ]) + | 'Filter plants by duration' >> beam.Filter( + lambda plant, keep_duration: keep_duration[plant['duration']], + keep_duration=beam.pvalue.AsDict(keep_duration), + ) + | beam.Map(print) + ) + # [END filter_side_inputs_dict] + if test: + test(perennials) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter_test.py new file mode 100644 index 0000000..d989e43 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter_test.py @@ -0,0 +1,81 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import filter + + +def check_perennials(actual): + # [START perennials] + perennials = [ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ] + # [END perennials] + assert_that(actual, equal_to(perennials)) + + +def check_valid_plants(actual): + # [START valid_plants] + valid_plants = [ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + ] + # [END valid_plants] + assert_that(actual, equal_to(valid_plants)) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.filter.print', lambda elem: elem) +# pylint: enable=line-too-long +class FilterTest(unittest.TestCase): + def test_filter_function(self): + filter.filter_function(check_perennials) + + def test_filter_lambda(self): + filter.filter_lambda(check_perennials) + + def test_filter_multiple_arguments(self): + filter.filter_multiple_arguments(check_perennials) + + def test_filter_side_inputs_singleton(self): + filter.filter_side_inputs_singleton(check_perennials) + + def test_filter_side_inputs_iter(self): + filter.filter_side_inputs_iter(check_valid_plants) + + def test_filter_side_inputs_dict(self): + filter.filter_side_inputs_dict(check_perennials) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap.py new file mode 100644 index 0000000..50ffe7a --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap.py @@ -0,0 +1,248 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def flatmap_simple(test=None): + # [START flatmap_simple] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + 'πStrawberry π₯Carrot πEggplant', + 'π Tomato π₯Potato', + ]) + | 'Split words' >> beam.FlatMap(str.split) + | beam.Map(print) + ) + # [END flatmap_simple] + if test: + test(plants) + + +def flatmap_function(test=None): + # [START flatmap_function] + import apache_beam as beam + + def split_words(text): + return text.split(',') + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + 'πStrawberry,π₯Carrot,πEggplant', + 'π Tomato,π₯Potato', + ]) + | 'Split words' >> beam.FlatMap(split_words) + | beam.Map(print) + ) + # [END flatmap_function] + if test: + test(plants) + + +def flatmap_lambda(test=None): + # [START flatmap_lambda] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + ['πStrawberry', 'π₯Carrot', 'πEggplant'], + ['π Tomato', 'π₯Potato'], + ]) + | 'Flatten lists' >> beam.FlatMap(lambda elements: elements) + | beam.Map(print) + ) + # [END flatmap_lambda] + if test: + test(plants) + + +def flatmap_generator(test=None): + # [START flatmap_generator] + import apache_beam as beam + + def generate_elements(elements): + for element in elements: + yield element + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + ['πStrawberry', 'π₯Carrot', 'πEggplant'], + ['π Tomato', 'π₯Potato'], + ]) + | 'Flatten lists' >> beam.FlatMap(generate_elements) + | beam.Map(print) + ) + # [END flatmap_generator] + if test: + test(plants) + + +def flatmap_multiple_arguments(test=None): + # [START flatmap_multiple_arguments] + import apache_beam as beam + + def split_words(text, delimiter=None): + return text.split(delimiter) + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + 'πStrawberry,π₯Carrot,πEggplant', + 'π Tomato,π₯Potato', + ]) + | 'Split words' >> beam.FlatMap(split_words, delimiter=',') + | beam.Map(print) + ) + # [END flatmap_multiple_arguments] + if test: + test(plants) + + +def flatmap_tuple(test=None): + # [START flatmap_tuple] + import apache_beam as beam + + def format_plant(icon, plant): + if icon: + yield '{}{}'.format(icon, plant) + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + ('π', 'Strawberry'), + ('π₯', 'Carrot'), + ('π', 'Eggplant'), + ('π ', 'Tomato'), + ('π₯', 'Potato'), + (None, 'Invalid'), + ]) + | 'Format' >> beam.FlatMapTuple(format_plant) + | beam.Map(print) + ) + # [END flatmap_tuple] + if test: + test(plants) + + +def flatmap_side_inputs_singleton(test=None): + # [START flatmap_side_inputs_singleton] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + delimiter = pipeline | 'Create delimiter' >> beam.Create([',']) + + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + 'πStrawberry,π₯Carrot,πEggplant', + 'π Tomato,π₯Potato', + ]) + | 'Split words' >> beam.FlatMap( + lambda text, delimiter: text.split(delimiter), + delimiter=beam.pvalue.AsSingleton(delimiter), + ) + | beam.Map(print) + ) + # [END flatmap_side_inputs_singleton] + if test: + test(plants) + + +def flatmap_side_inputs_iter(test=None): + # [START flatmap_side_inputs_iter] + import apache_beam as beam + + def normalize_and_validate_durations(plant, valid_durations): + plant['duration'] = plant['duration'].lower() + if plant['duration'] in valid_durations: + yield plant + + with beam.Pipeline() as pipeline: + valid_durations = pipeline | 'Valid durations' >> beam.Create([ + 'annual', + 'biennial', + 'perennial', + ]) + + valid_plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'Perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'BIENNIAL'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'unknown'}, + ]) + | 'Normalize and validate durations' >> beam.FlatMap( + normalize_and_validate_durations, + valid_durations=beam.pvalue.AsIter(valid_durations), + ) + | beam.Map(print) + ) + # [END flatmap_side_inputs_iter] + if test: + test(valid_plants) + + +def flatmap_side_inputs_dict(test=None): + # [START flatmap_side_inputs_dict] + import apache_beam as beam + + def replace_duration_if_valid(plant, durations): + if plant['duration'] in durations: + plant['duration'] = durations[plant['duration']] + yield plant + + with beam.Pipeline() as pipeline: + durations = pipeline | 'Durations dict' >> beam.Create([ + (0, 'annual'), + (1, 'biennial'), + (2, 'perennial'), + ]) + + valid_plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 2}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 1}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 2}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 0}, + {'icon': 'π₯', 'name': 'Potato', 'duration': -1}, + ]) + | 'Replace duration if valid' >> beam.FlatMap( + replace_duration_if_valid, + durations=beam.pvalue.AsDict(durations), + ) + | beam.Map(print) + ) + # [END flatmap_side_inputs_dict] + if test: + test(valid_plants) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_test.py new file mode 100644 index 0000000..718dcee --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_test.py @@ -0,0 +1,92 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import flatmap + + +def check_plants(actual): + # [START plants] + plants = [ + 'πStrawberry', + 'π₯Carrot', + 'πEggplant', + 'π Tomato', + 'π₯Potato', + ] + # [END plants] + assert_that(actual, equal_to(plants)) + + +def check_valid_plants(actual): + # [START valid_plants] + valid_plants = [ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + ] + # [END valid_plants] + assert_that(actual, equal_to(valid_plants)) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.flatmap.print', lambda elem: elem) +# pylint: enable=line-too-long +class FlatMapTest(unittest.TestCase): + def test_flatmap_simple(self): + flatmap.flatmap_simple(check_plants) + + def test_flatmap_function(self): + flatmap.flatmap_function(check_plants) + + def test_flatmap_lambda(self): + flatmap.flatmap_lambda(check_plants) + + def test_flatmap_generator(self): + flatmap.flatmap_generator(check_plants) + + def test_flatmap_multiple_arguments(self): + flatmap.flatmap_multiple_arguments(check_plants) + + def test_flatmap_tuple(self): + flatmap.flatmap_tuple(check_plants) + + def test_flatmap_side_inputs_singleton(self): + flatmap.flatmap_side_inputs_singleton(check_plants) + + def test_flatmap_side_inputs_iter(self): + flatmap.flatmap_side_inputs_iter(check_valid_plants) + + def test_flatmap_side_inputs_dict(self): + flatmap.flatmap_side_inputs_dict(check_valid_plants) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys.py new file mode 100644 index 0000000..01c9d6b --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys.py @@ -0,0 +1,42 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def keys(test=None): + # [START keys] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + icons = ( + pipeline + | 'Garden plants' >> beam.Create([ + ('π', 'Strawberry'), + ('π₯', 'Carrot'), + ('π', 'Eggplant'), + ('π ', 'Tomato'), + ('π₯', 'Potato'), + ]) + | 'Keys' >> beam.Keys() + | beam.Map(print) + ) + # [END keys] + if test: + test(icons) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys_test.py new file mode 100644 index 0000000..780c5e4 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys_test.py @@ -0,0 +1,56 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import keys + + +def check_icons(actual): + # [START icons] + icons = [ + 'π', + 'π₯', + 'π', + 'π ', + 'π₯', + ] + # [END icons] + assert_that(actual, equal_to(icons)) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.keys.print', lambda elem: elem) +# pylint: enable=line-too-long +class KeysTest(unittest.TestCase): + def test_keys(self): + keys.keys(check_icons) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap.py new file mode 100644 index 0000000..2107fd5 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap.py @@ -0,0 +1,42 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def kvswap(test=None): + # [START kvswap] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Garden plants' >> beam.Create([ + ('π', 'Strawberry'), + ('π₯', 'Carrot'), + ('π', 'Eggplant'), + ('π ', 'Tomato'), + ('π₯', 'Potato'), + ]) + | 'Key-Value swap' >> beam.KvSwap() + | beam.Map(print) + ) + # [END kvswap] + if test: + test(plants) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap_test.py new file mode 100644 index 0000000..ea7698b --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap_test.py @@ -0,0 +1,56 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import kvswap + + +def check_plants(actual): + # [START plants] + plants = [ + ('Strawberry', 'π'), + ('Carrot', 'π₯'), + ('Eggplant', 'π'), + ('Tomato', 'π '), + ('Potato', 'π₯'), + ] + # [END plants] + assert_that(actual, equal_to(plants)) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.kvswap.print', lambda elem: elem) +# pylint: enable=line-too-long +class KvSwapTest(unittest.TestCase): + def test_kvswap(self): + kvswap.kvswap(check_plants) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map.py new file mode 100644 index 0000000..9defd47 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map.py @@ -0,0 +1,226 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def map_simple(test=None): + # [START map_simple] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + ' πStrawberry \n', + ' π₯Carrot \n', + ' πEggplant \n', + ' π Tomato \n', + ' π₯Potato \n', + ]) + | 'Strip' >> beam.Map(str.strip) + | beam.Map(print) + ) + # [END map_simple] + if test: + test(plants) + + +def map_function(test=None): + # [START map_function] + import apache_beam as beam + + def strip_header_and_newline(text): + return text.strip('# \n') + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + '# πStrawberry\n', + '# π₯Carrot\n', + '# πEggplant\n', + '# π Tomato\n', + '# π₯Potato\n', + ]) + | 'Strip header' >> beam.Map(strip_header_and_newline) + | beam.Map(print) + ) + # [END map_function] + if test: + test(plants) + + +def map_lambda(test=None): + # [START map_lambda] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + '# πStrawberry\n', + '# π₯Carrot\n', + '# πEggplant\n', + '# π Tomato\n', + '# π₯Potato\n', + ]) + | 'Strip header' >> beam.Map(lambda text: text.strip('# \n')) + | beam.Map(print) + ) + # [END map_lambda] + if test: + test(plants) + + +def map_multiple_arguments(test=None): + # [START map_multiple_arguments] + import apache_beam as beam + + def strip(text, chars=None): + return text.strip(chars) + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + '# πStrawberry\n', + '# π₯Carrot\n', + '# πEggplant\n', + '# π Tomato\n', + '# π₯Potato\n', + ]) + | 'Strip header' >> beam.Map(strip, chars='# \n') + | beam.Map(print) + ) + # [END map_multiple_arguments] + if test: + test(plants) + + +def map_tuple(test=None): + # [START map_tuple] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + ('π', 'Strawberry'), + ('π₯', 'Carrot'), + ('π', 'Eggplant'), + ('π ', 'Tomato'), + ('π₯', 'Potato'), + ]) + | 'Format' >> beam.MapTuple( + lambda icon, plant: '{}{}'.format(icon, plant)) + | beam.Map(print) + ) + # [END map_tuple] + if test: + test(plants) + + +def map_side_inputs_singleton(test=None): + # [START map_side_inputs_singleton] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + chars = pipeline | 'Create chars' >> beam.Create(['# \n']) + + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + '# πStrawberry\n', + '# π₯Carrot\n', + '# πEggplant\n', + '# π Tomato\n', + '# π₯Potato\n', + ]) + | 'Strip header' >> beam.Map( + lambda text, chars: text.strip(chars), + chars=beam.pvalue.AsSingleton(chars), + ) + | beam.Map(print) + ) + # [END map_side_inputs_singleton] + if test: + test(plants) + + +def map_side_inputs_iter(test=None): + # [START map_side_inputs_iter] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + chars = pipeline | 'Create chars' >> beam.Create(['#', ' ', '\n']) + + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + '# πStrawberry\n', + '# π₯Carrot\n', + '# πEggplant\n', + '# π Tomato\n', + '# π₯Potato\n', + ]) + | 'Strip header' >> beam.Map( + lambda text, chars: text.strip(''.join(chars)), + chars=beam.pvalue.AsIter(chars), + ) + | beam.Map(print) + ) + # [END map_side_inputs_iter] + if test: + test(plants) + + +def map_side_inputs_dict(test=None): + # [START map_side_inputs_dict] + import apache_beam as beam + + def replace_duration(plant, durations): + plant['duration'] = durations[plant['duration']] + return plant + + with beam.Pipeline() as pipeline: + durations = pipeline | 'Durations' >> beam.Create([ + (0, 'annual'), + (1, 'biennial'), + (2, 'perennial'), + ]) + + plant_details = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 2}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 1}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 2}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 0}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 2}, + ]) + | 'Replace duration' >> beam.Map( + replace_duration, + durations=beam.pvalue.AsDict(durations), + ) + | beam.Map(print) + ) + # [END map_side_inputs_dict] + if test: + test(plant_details) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map_test.py new file mode 100644 index 0000000..4186176 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map_test.py @@ -0,0 +1,90 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import map + + +def check_plants(actual): + # [START plants] + plants = [ + 'πStrawberry', + 'π₯Carrot', + 'πEggplant', + 'π Tomato', + 'π₯Potato', + ] + # [END plants] + assert_that(actual, equal_to(plants)) + + +def check_plant_details(actual): + # [START plant_details] + plant_details = [ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ] + # [END plant_details] + assert_that(actual, equal_to(plant_details)) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.map.print', lambda elem: elem) +# pylint: enable=line-too-long +class MapTest(unittest.TestCase): + def test_map_simple(self): + map.map_simple(check_plants) + + def test_map_function(self): + map.map_function(check_plants) + + def test_map_lambda(self): + map.map_lambda(check_plants) + + def test_map_multiple_arguments(self): + map.map_multiple_arguments(check_plants) + + def test_map_tuple(self): + map.map_tuple(check_plants) + + def test_map_side_inputs_singleton(self): + map.map_side_inputs_singleton(check_plants) + + def test_map_side_inputs_iter(self): + map.map_side_inputs_iter(check_plants) + + def test_map_side_inputs_dict(self): + map.map_side_inputs_dict(check_plant_details) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py new file mode 100644 index 0000000..971e9f0 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py @@ -0,0 +1,126 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function +from __future__ import unicode_literals + + +def pardo_dofn(test=None): + # [START pardo_dofn] + import apache_beam as beam + + class SplitWords(beam.DoFn): + def __init__(self, delimiter=','): + self.delimiter = delimiter + + def process(self, text): + for word in text.split(self.delimiter): + yield word + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create([ + 'πStrawberry,π₯Carrot,πEggplant', + 'π Tomato,π₯Potato', + ]) + | 'Split words' >> beam.ParDo(SplitWords(',')) + | beam.Map(print) + ) + # [END pardo_dofn] + if test: + test(plants) + + +def pardo_dofn_params(test=None): + # pylint: disable=line-too-long + # [START pardo_dofn_params] + import apache_beam as beam + + class AnalyzeElement(beam.DoFn): + def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam): + yield '\n'.join([ + '# timestamp', + 'type(timestamp) -> ' + repr(type(timestamp)), + 'timestamp.micros -> ' + repr(timestamp.micros), + 'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()), + 'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()), + '', + '# window', + 'type(window) -> ' + repr(type(window)), + 'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()), + 'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()), + 'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()), + ]) + + with beam.Pipeline() as pipeline: + dofn_params = ( + pipeline + | 'Create a single test element' >> beam.Create([':)']) + | 'Add timestamp (Spring equinox 2020)' >> beam.Map( + lambda elem: beam.window.TimestampedValue(elem, 1584675660)) + | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30)) + | 'Analyze element' >> beam.ParDo(AnalyzeElement()) + | beam.Map(print) + ) + # [END pardo_dofn_params] + # pylint: enable=line-too-long + if test: + test(dofn_params) + + +def pardo_dofn_methods(test=None): + # [START pardo_dofn_methods] + import apache_beam as beam + + class DoFnMethods(beam.DoFn): + def __init__(self): + print('__init__') + self.window = beam.window.GlobalWindow() + + def setup(self): + print('setup') + + def start_bundle(self): + print('start_bundle') + + def process(self, element, window=beam.DoFn.WindowParam): + self.window = window + yield '* process: ' + element + + def finish_bundle(self): + yield beam.utils.windowed_value.WindowedValue( + value='* finish_bundle: π±π³π', + timestamp=0, + windows=[self.window], + ) + + def teardown(self): + print('teardown') + + with beam.Pipeline() as pipeline: + results = ( + pipeline + | 'Create inputs' >> beam.Create(['π', 'π₯', 'π', 'π ', 'π₯']) + | 'DoFn methods' >> beam.ParDo(DoFnMethods()) + | beam.Map(print) + ) + # [END pardo_dofn_methods] + if test: + return test(results) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py new file mode 100644 index 0000000..8507e01 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py @@ -0,0 +1,120 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function +from __future__ import unicode_literals + +import io +import platform +import sys +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import pardo + + +def check_plants(actual): + # [START plants] + plants = [ + 'πStrawberry', + 'π₯Carrot', + 'πEggplant', + 'π Tomato', + 'π₯Potato', + ] + # [END plants] + assert_that(actual, equal_to(plants)) + + +def check_dofn_params(actual): + # pylint: disable=line-too-long + dofn_params = '\n'.join('''[START dofn_params] +# timestamp +type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'> +timestamp.micros -> 1584675660000000 +timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z' +timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41) + +# window +type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'> +window.start -> Timestamp(1584675660) (2020-03-20 03:41:00) +window.end -> Timestamp(1584675690) (2020-03-20 03:41:30) +window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999) +[END dofn_params]'''.splitlines()[1:-1]) + # pylint: enable=line-too-long + assert_that(actual, equal_to([dofn_params])) + + +def check_dofn_methods(actual): + # Return the expected stdout to check the ordering of the called methods. + return '''[START results] +__init__ +setup +start_bundle +* process: π +* process: π₯ +* process: π +* process: π +* process: π₯ +* finish_bundle: π±π³π +teardown +[END results]'''.splitlines()[1:-1] + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.pardo.print', lambda elem: elem) +# pylint: enable=line-too-long +class ParDoTest(unittest.TestCase): + def test_pardo_dofn(self): + pardo.pardo_dofn(check_plants) + + # TODO: Remove this after Python 2 deprecation. + # https://issues.apache.org/jira/browse/BEAM-8124 + @unittest.skipIf(sys.version_info[0] < 3 and platform.system() == 'Windows', + 'Python 2 on Windows uses `long` rather than `int`') + def test_pardo_dofn_params(self): + pardo.pardo_dofn_params(check_dofn_params) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +@mock.patch('sys.stdout', new_callable=io.StringIO) +class ParDoStdoutTest(unittest.TestCase): + def test_pardo_dofn_methods(self, mock_stdout): + expected = pardo.pardo_dofn_methods(check_dofn_methods) + actual = mock_stdout.getvalue().splitlines() + + # For the stdout, check the ordering of the methods, not of the elements. + actual_stdout = [line.split(':')[0] for line in actual] + expected_stdout = [line.split(':')[0] for line in expected] + self.assertEqual(actual_stdout, expected_stdout) + + # For the elements, ignore the stdout and just make sure all elements match. + actual_elements = {line for line in actual if line.startswith('*')} + expected_elements = {line for line in expected if line.startswith('*')} + self.assertEqual(actual_elements, expected_elements) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py new file mode 100644 index 0000000..6f839d4 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py @@ -0,0 +1,136 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def partition_function(test=None): + # [START partition_function] + import apache_beam as beam + + durations = ['annual', 'biennial', 'perennial'] + + def by_duration(plant, num_partitions): + return durations.index(plant['duration']) + + with beam.Pipeline() as pipeline: + annuals, biennials, perennials = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ]) + | 'Partition' >> beam.Partition(by_duration, len(durations)) + ) + _ = ( + annuals + | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x))) + ) + _ = ( + biennials + | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x))) + ) + _ = ( + perennials + | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x))) + ) + # [END partition_function] + if test: + test(annuals, biennials, perennials) + + +def partition_lambda(test=None): + # [START partition_lambda] + import apache_beam as beam + + durations = ['annual', 'biennial', 'perennial'] + + with beam.Pipeline() as pipeline: + annuals, biennials, perennials = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ]) + | 'Partition' >> beam.Partition( + lambda plant, num_partitions: durations.index(plant['duration']), + len(durations), + ) + ) + _ = ( + annuals + | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x))) + ) + _ = ( + biennials + | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x))) + ) + _ = ( + perennials + | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x))) + ) + # [END partition_lambda] + if test: + test(annuals, biennials, perennials) + + +def partition_multiple_arguments(test=None): + # [START partition_multiple_arguments] + import apache_beam as beam + import json + + def split_dataset(plant, num_partitions, ratio): + assert num_partitions == len(ratio) + bucket = sum(map(ord, json.dumps(plant))) % sum(ratio) + total = 0 + for i, part in enumerate(ratio): + total += part + if bucket < total: + return i + return len(ratio) - 1 + + with beam.Pipeline() as pipeline: + train_dataset, test_dataset = ( + pipeline + | 'Gardening plants' >> beam.Create([ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ]) + | 'Partition' >> beam.Partition(split_dataset, 2, ratio=[8, 2]) + ) + _ = ( + train_dataset + | 'Train' >> beam.Map(lambda x: print('train: ' + str(x))) + ) + _ = ( + test_dataset + | 'Test' >> beam.Map(lambda x: print('test: ' + str(x))) + ) + # [END partition_multiple_arguments] + if test: + test(train_dataset, test_dataset) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition_test.py new file mode 100644 index 0000000..0b8ae3d --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition_test.py @@ -0,0 +1,84 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import partition + + +def check_partitions(actual1, actual2, actual3): + # [START partitions] + annuals = [ + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + ] + biennials = [ + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + ] + perennials = [ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ] + # [END partitions] + assert_that(actual1, equal_to(annuals), label='assert annuals') + assert_that(actual2, equal_to(biennials), label='assert biennials') + assert_that(actual3, equal_to(perennials), label='assert perennials') + + +def check_split_datasets(actual1, actual2): + # [START train_test] + train_dataset = [ + {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'}, + {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'}, + {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'}, + ] + test_dataset = [ + {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'}, + {'icon': 'π ', 'name': 'Tomato', 'duration': 'annual'}, + ] + # [END train_test] + assert_that(actual1, equal_to(train_dataset), label='assert train') + assert_that(actual2, equal_to(test_dataset), label='assert test') + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.partition.print', lambda elem: elem) +# pylint: enable=line-too-long +class PartitionTest(unittest.TestCase): + def test_partition_function(self): + partition.partition_function(check_partitions) + + def test_partition_lambda(self): + partition.partition_lambda(check_partitions) + + def test_partition_multiple_arguments(self): + partition.partition_multiple_arguments(check_split_datasets) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex.py new file mode 100644 index 0000000..b39b534 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex.py @@ -0,0 +1,236 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def regex_matches(test=None): + # [START regex_matches] + import apache_beam as beam + + # Matches a named group 'icon', and then two comma-separated groups. + regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)' + with beam.Pipeline() as pipeline: + plants_matches = ( + pipeline + | 'Garden plants' >> beam.Create([ + 'π, Strawberry, perennial', + 'π₯, Carrot, biennial ignoring trailing words', + 'π, Eggplant, perennial', + 'π , Tomato, annual', + 'π₯, Potato, perennial', + '# π, invalid, format', + 'invalid, π, format', + ]) + | 'Parse plants' >> beam.Regex.matches(regex) + | beam.Map(print) + ) + # [END regex_matches] + if test: + test(plants_matches) + + +def regex_all_matches(test=None): + # [START regex_all_matches] + import apache_beam as beam + + # Matches a named group 'icon', and then two comma-separated groups. + regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)' + with beam.Pipeline() as pipeline: + plants_all_matches = ( + pipeline + | 'Garden plants' >> beam.Create([ + 'π, Strawberry, perennial', + 'π₯, Carrot, biennial ignoring trailing words', + 'π, Eggplant, perennial', + 'π , Tomato, annual', + 'π₯, Potato, perennial', + '# π, invalid, format', + 'invalid, π, format', + ]) + | 'Parse plants' >> beam.Regex.all_matches(regex) + | beam.Map(print) + ) + # [END regex_all_matches] + if test: + test(plants_all_matches) + + +def regex_matches_kv(test=None): + # [START regex_matches_kv] + import apache_beam as beam + + # Matches a named group 'icon', and then two comma-separated groups. + regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)' + with beam.Pipeline() as pipeline: + plants_matches_kv = ( + pipeline + | 'Garden plants' >> beam.Create([ + 'π, Strawberry, perennial', + 'π₯, Carrot, biennial ignoring trailing words', + 'π, Eggplant, perennial', + 'π , Tomato, annual', + 'π₯, Potato, perennial', + '# π, invalid, format', + 'invalid, π, format', + ]) + | 'Parse plants' >> beam.Regex.matches_kv(regex, keyGroup='icon') + | beam.Map(print) + ) + # [END regex_matches_kv] + if test: + test(plants_matches_kv) + + +def regex_find(test=None): + # [START regex_find] + import apache_beam as beam + + # Matches a named group 'icon', and then two comma-separated groups. + regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)' + with beam.Pipeline() as pipeline: + plants_matches = ( + pipeline + | 'Garden plants' >> beam.Create([ + '# π, Strawberry, perennial', + '# π₯, Carrot, biennial ignoring trailing words', + '# π, Eggplant, perennial - π, Banana, perennial', + '# π , Tomato, annual - π, Watermelon, annual', + '# π₯, Potato, perennial', + ]) + | 'Parse plants' >> beam.Regex.find(regex) + | beam.Map(print) + ) + # [END regex_find] + if test: + test(plants_matches) + + +def regex_find_all(test=None): + # [START regex_find_all] + import apache_beam as beam + + # Matches a named group 'icon', and then two comma-separated groups. + regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)' + with beam.Pipeline() as pipeline: + plants_find_all = ( + pipeline + | 'Garden plants' >> beam.Create([ + '# π, Strawberry, perennial', + '# π₯, Carrot, biennial ignoring trailing words', + '# π, Eggplant, perennial - π, Banana, perennial', + '# π , Tomato, annual - π, Watermelon, annual', + '# π₯, Potato, perennial', + ]) + | 'Parse plants' >> beam.Regex.find_all(regex) + | beam.Map(print) + ) + # [END regex_find_all] + if test: + test(plants_find_all) + + +def regex_find_kv(test=None): + # [START regex_find_kv] + import apache_beam as beam + + # Matches a named group 'icon', and then two comma-separated groups. + regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)' + with beam.Pipeline() as pipeline: + plants_matches_kv = ( + pipeline + | 'Garden plants' >> beam.Create([ + '# π, Strawberry, perennial', + '# π₯, Carrot, biennial ignoring trailing words', + '# π, Eggplant, perennial - π, Banana, perennial', + '# π , Tomato, annual - π, Watermelon, annual', + '# π₯, Potato, perennial', + ]) + | 'Parse plants' >> beam.Regex.find_kv(regex, keyGroup='icon') + | beam.Map(print) + ) + # [END regex_find_kv] + if test: + test(plants_matches_kv) + + +def regex_replace_all(test=None): + # [START regex_replace_all] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants_replace_all = ( + pipeline + | 'Garden plants' >> beam.Create([ + 'π : Strawberry : perennial', + 'π₯ : Carrot : biennial', + 'π\t:\tEggplant\t:\tperennial', + 'π : Tomato : annual', + 'π₯ : Potato : perennial', + ]) + | 'To CSV' >> beam.Regex.replace_all(r'\s*:\s*', ',') + | beam.Map(print) + ) + # [END regex_replace_all] + if test: + test(plants_replace_all) + + +def regex_replace_first(test=None): + # [START regex_replace_first] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants_replace_first = ( + pipeline + | 'Garden plants' >> beam.Create([ + 'π, Strawberry, perennial', + 'π₯, Carrot, biennial', + 'π,\tEggplant, perennial', + 'π , Tomato, annual', + 'π₯, Potato, perennial', + ]) + | 'As dictionary' >> beam.Regex.replace_first(r'\s*,\s*', ': ') + | beam.Map(print) + ) + # [END regex_replace_first] + if test: + test(plants_replace_first) + + +def regex_split(test=None): + # [START regex_split] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants_split = ( + pipeline + | 'Garden plants' >> beam.Create([ + 'π : Strawberry : perennial', + 'π₯ : Carrot : biennial', + 'π\t:\tEggplant : perennial', + 'π : Tomato : annual', + 'π₯ : Potato : perennial', + ]) + | 'Parse plants' >> beam.Regex.split(r'\s*:\s*') + | beam.Map(print) + ) + # [END regex_split] + if test: + test(plants_split) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex_test.py new file mode 100644 index 0000000..8312312 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex_test.py @@ -0,0 +1,173 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import regex + + +def check_matches(actual): + # [START plants_matches] + plants_matches = [ + 'π, Strawberry, perennial', + 'π₯, Carrot, biennial', + 'π, Eggplant, perennial', + 'π , Tomato, annual', + 'π₯, Potato, perennial', + ] + # [END plants_matches] + assert_that(actual, equal_to(plants_matches)) + + +def check_all_matches(actual): + # [START plants_all_matches] + plants_all_matches = [ + ['π, Strawberry, perennial', 'π', 'Strawberry', 'perennial'], + ['π₯, Carrot, biennial', 'π₯', 'Carrot', 'biennial'], + ['π, Eggplant, perennial', 'π', 'Eggplant', 'perennial'], + ['π , Tomato, annual', 'π ', 'Tomato', 'annual'], + ['π₯, Potato, perennial', 'π₯', 'Potato', 'perennial'], + ] + # [END plants_all_matches] + assert_that(actual, equal_to(plants_all_matches)) + + +def check_matches_kv(actual): + # [START plants_matches_kv] + plants_matches_kv = [ + ('π', 'π, Strawberry, perennial'), + ('π₯', 'π₯, Carrot, biennial'), + ('π', 'π, Eggplant, perennial'), + ('π ', 'π , Tomato, annual'), + ('π₯', 'π₯, Potato, perennial'), + ] + # [END plants_matches_kv] + assert_that(actual, equal_to(plants_matches_kv)) + + +def check_find_all(actual): + # [START plants_find_all] + plants_find_all = [ + ['π, Strawberry, perennial'], + ['π₯, Carrot, biennial'], + ['π, Eggplant, perennial', 'π, Banana, perennial'], + ['π , Tomato, annual', 'π, Watermelon, annual'], + ['π₯, Potato, perennial'], + ] + # [END plants_find_all] + assert_that(actual, equal_to(plants_find_all)) + + +def check_find_kv(actual): + # [START plants_find_kv] + plants_find_all = [ + ('π', 'π, Strawberry, perennial'), + ('π₯', 'π₯, Carrot, biennial'), + ('π', 'π, Eggplant, perennial'), + ('π', 'π, Banana, perennial'), + ('π ', 'π , Tomato, annual'), + ('π', 'π, Watermelon, annual'), + ('π₯', 'π₯, Potato, perennial'), + ] + # [END plants_find_kv] + assert_that(actual, equal_to(plants_find_all)) + + +def check_replace_all(actual): + # [START plants_replace_all] + plants_replace_all = [ + 'π,Strawberry,perennial', + 'π₯,Carrot,biennial', + 'π,Eggplant,perennial', + 'π ,Tomato,annual', + 'π₯,Potato,perennial', + ] + # [END plants_replace_all] + assert_that(actual, equal_to(plants_replace_all)) + + +def check_replace_first(actual): + # [START plants_replace_first] + plants_replace_first = [ + 'π: Strawberry, perennial', + 'π₯: Carrot, biennial', + 'π: Eggplant, perennial', + 'π : Tomato, annual', + 'π₯: Potato, perennial', + ] + # [END plants_replace_first] + assert_that(actual, equal_to(plants_replace_first)) + + +def check_split(actual): + # [START plants_split] + plants_split = [ + ['π', 'Strawberry', 'perennial'], + ['π₯', 'Carrot', 'biennial'], + ['π', 'Eggplant', 'perennial'], + ['π ', 'Tomato', 'annual'], + ['π₯', 'Potato', 'perennial'], + ] + # [END plants_split] + assert_that(actual, equal_to(plants_split)) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.regex.print', lambda elem: elem) +# pylint: enable=line-too-long +class RegexTest(unittest.TestCase): + def test_matches(self): + regex.regex_matches(check_matches) + + def test_all_matches(self): + regex.regex_all_matches(check_all_matches) + + def test_matches_kv(self): + regex.regex_matches_kv(check_matches_kv) + + def test_find(self): + regex.regex_find(check_matches) + + def test_find_all(self): + regex.regex_find_all(check_find_all) + + def test_find_kv(self): + regex.regex_find_kv(check_find_kv) + + def test_replace_all(self): + regex.regex_replace_all(check_replace_all) + + def test_replace_first(self): + regex.regex_replace_first(check_replace_first) + + def test_split(self): + regex.regex_split(check_split) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring.py new file mode 100644 index 0000000..1d0b7dd --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring.py @@ -0,0 +1,86 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def tostring_kvs(test=None): + # [START tostring_kvs] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Garden plants' >> beam.Create([ + ('π', 'Strawberry'), + ('π₯', 'Carrot'), + ('π', 'Eggplant'), + ('π ', 'Tomato'), + ('π₯', 'Potato'), + ]) + | 'To string' >> beam.ToString.Kvs() + | beam.Map(print) + ) + # [END tostring_kvs] + if test: + test(plants) + + +def tostring_element(test=None): + # [START tostring_element] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plant_lists = ( + pipeline + | 'Garden plants' >> beam.Create([ + ['π', 'Strawberry', 'perennial'], + ['π₯', 'Carrot', 'biennial'], + ['π', 'Eggplant', 'perennial'], + ['π ', 'Tomato', 'annual'], + ['π₯', 'Potato', 'perennial'], + ]) + | 'To string' >> beam.ToString.Element() + | beam.Map(print) + ) + # [END tostring_element] + if test: + test(plant_lists) + + +def tostring_iterables(test=None): + # [START tostring_iterables] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants_csv = ( + pipeline + | 'Garden plants' >> beam.Create([ + ['π', 'Strawberry', 'perennial'], + ['π₯', 'Carrot', 'biennial'], + ['π', 'Eggplant', 'perennial'], + ['π ', 'Tomato', 'annual'], + ['π₯', 'Potato', 'perennial'], + ]) + | 'To string' >> beam.ToString.Iterables() + | beam.Map(print) + ) + # [END tostring_iterables] + if test: + test(plants_csv) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring_test.py new file mode 100644 index 0000000..b253ea1 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring_test.py @@ -0,0 +1,105 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import sys +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import tostring + + +def check_plants(actual): + # [START plants] + plants = [ + 'π,Strawberry', + 'π₯,Carrot', + 'π,Eggplant', + 'π ,Tomato', + 'π₯,Potato', + ] + # [END plants] + assert_that(actual, equal_to(plants)) + + +def check_plant_lists(actual): + # [START plant_lists] + plant_lists = [ + "['π', 'Strawberry', 'perennial']", + "['π₯', 'Carrot', 'biennial']", + "['π', 'Eggplant', 'perennial']", + "['π ', 'Tomato', 'annual']", + "['π₯', 'Potato', 'perennial']", + ] + # [END plant_lists] + + # Some unicode characters become escaped with double backslashes. + import apache_beam as beam + + def normalize_escaping(elem): + # In Python 2 all utf-8 characters are escaped with double backslashes. + # TODO: Remove this after Python 2 deprecation. + # https://issues.apache.org/jira/browse/BEAM-8124 + if sys.version_info.major == 2: + return elem.decode('string-escape') + + # In Python 3.5 some utf-8 characters are escaped with double backslashes. + if '\\' in elem: + return bytes(elem, 'utf-8').decode('unicode-escape') + return elem + actual = actual | beam.Map(normalize_escaping) + assert_that(actual, equal_to(plant_lists)) + + +def check_plants_csv(actual): + # [START plants_csv] + plants_csv = [ + 'π,Strawberry,perennial', + 'π₯,Carrot,biennial', + 'π,Eggplant,perennial', + 'π ,Tomato,annual', + 'π₯,Potato,perennial', + ] + # [END plants_csv] + assert_that(actual, equal_to(plants_csv)) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.tostring.print', lambda elem: elem) +# pylint: enable=line-too-long +class ToStringTest(unittest.TestCase): + def test_tostring_kvs(self): + tostring.tostring_kvs(check_plants) + + def test_tostring_element(self): + tostring.tostring_element(check_plant_lists) + + def test_tostring_iterables(self): + tostring.tostring_iterables(check_plants_csv) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values.py new file mode 100644 index 0000000..8504ff4 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values.py @@ -0,0 +1,42 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def values(test=None): + # [START values] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Garden plants' >> beam.Create([ + ('π', 'Strawberry'), + ('π₯', 'Carrot'), + ('π', 'Eggplant'), + ('π ', 'Tomato'), + ('π₯', 'Potato'), + ]) + | 'Values' >> beam.Values() + | beam.Map(print) + ) + # [END values] + if test: + test(plants) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values_test.py new file mode 100644 index 0000000..06abef6 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values_test.py @@ -0,0 +1,56 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import values + + +def check_plants(actual): + # [START plants] + plants = [ + 'Strawberry', + 'Carrot', + 'Eggplant', + 'Tomato', + 'Potato', + ] + # [END plants] + assert_that(actual, equal_to(plants)) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.values.print', lambda elem: elem) +# pylint: enable=line-too-long +class ValuesTest(unittest.TestCase): + def test_values(self): + values.values(check_plants) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps.py new file mode 100644 index 0000000..d1e2e3f --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps.py @@ -0,0 +1,128 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def withtimestamps_event_time(test=None): + # [START withtimestamps_event_time] + import apache_beam as beam + + class GetTimestamp(beam.DoFn): + def process(self, plant, timestamp=beam.DoFn.TimestampParam): + yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name']) + + with beam.Pipeline() as pipeline: + plant_timestamps = ( + pipeline + | 'Garden plants' >> beam.Create([ + {'name': 'Strawberry', 'season': 1585699200}, # April, 2020 + {'name': 'Carrot', 'season': 1590969600}, # June, 2020 + {'name': 'Artichoke', 'season': 1583020800}, # March, 2020 + {'name': 'Tomato', 'season': 1588291200}, # May, 2020 + {'name': 'Potato', 'season': 1598918400}, # September, 2020 + ]) + | 'With timestamps' >> beam.Map( + lambda plant: beam.window.TimestampedValue(plant, plant['season'])) + | 'Get timestamp' >> beam.ParDo(GetTimestamp()) + | beam.Map(print) + ) + # [END event_time] + if test: + test(plant_timestamps) + + +def withtimestamps_logical_clock(test=None): + # [START withtimestamps_logical_clock] + import apache_beam as beam + + class GetTimestamp(beam.DoFn): + def process(self, plant, timestamp=beam.DoFn.TimestampParam): + event_id = int(timestamp.micros / 1e6) # equivalent to seconds + yield '{} - {}'.format(event_id, plant['name']) + + with beam.Pipeline() as pipeline: + plant_events = ( + pipeline + | 'Garden plants' >> beam.Create([ + {'name': 'Strawberry', 'event_id': 1}, + {'name': 'Carrot', 'event_id': 4}, + {'name': 'Artichoke', 'event_id': 2}, + {'name': 'Tomato', 'event_id': 3}, + {'name': 'Potato', 'event_id': 5}, + ]) + | 'With timestamps' >> beam.Map(lambda plant: \ + beam.window.TimestampedValue(plant, plant['event_id'])) + | 'Get timestamp' >> beam.ParDo(GetTimestamp()) + | beam.Map(print) + ) + # [END logical_clock] + if test: + test(plant_events) + + +def withtimestamps_processing_time(test=None): + # [START withtimestamps_processing_time] + import apache_beam as beam + import time + + class GetTimestamp(beam.DoFn): + def process(self, plant, timestamp=beam.DoFn.TimestampParam): + yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name']) + + with beam.Pipeline() as pipeline: + plant_processing_times = ( + pipeline + | 'Garden plants' >> beam.Create([ + {'name': 'Strawberry'}, + {'name': 'Carrot'}, + {'name': 'Artichoke'}, + {'name': 'Tomato'}, + {'name': 'Potato'}, + ]) + | 'With timestamps' >> beam.Map(lambda plant: \ + beam.window.TimestampedValue(plant, time.time())) + | 'Get timestamp' >> beam.ParDo(GetTimestamp()) + | beam.Map(print) + ) + # [END processing_time] + if test: + test(plant_processing_times) + + +def time_tuple2unix_time(): + # [START time_tuple2unix_time] + import time + + time_tuple = time.strptime('2020-03-19 20:50:00', '%Y-%m-%d %H:%M:%S') + unix_time = time.mktime(time_tuple) + # [END time_tuple2unix_time] + return unix_time + + +def datetime2unix_time(): + # [START datetime2unix_time] + import time + import datetime + + now = datetime.datetime.now() + time_tuple = now.timetuple() + unix_time = time.mktime(time_tuple) + # [END datetime2unix_time] + return unix_time diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps_test.py new file mode 100644 index 0000000..53fa7e2 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps_test.py @@ -0,0 +1,103 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import withtimestamps + + +def check_plant_timestamps(actual): + # [START plant_timestamps] + plant_timestamps = [ + '2020-04-01 00:00:00 - Strawberry', + '2020-06-01 00:00:00 - Carrot', + '2020-03-01 00:00:00 - Artichoke', + '2020-05-01 00:00:00 - Tomato', + '2020-09-01 00:00:00 - Potato', + ] + # [END plant_timestamps] + assert_that(actual, equal_to(plant_timestamps)) + + +def check_plant_events(actual): + # [START plant_events] + plant_events = [ + '1 - Strawberry', + '4 - Carrot', + '2 - Artichoke', + '3 - Tomato', + '5 - Potato', + ] + # [END plant_events] + assert_that(actual, equal_to(plant_events)) + + +def check_plant_processing_times(actual): + import apache_beam as beam + + # [START plant_processing_times] + plant_processing_times = [ + '2020-03-20 20:12:42.145594 - Strawberry', + '2020-03-20 20:12:42.145827 - Carrot', + '2020-03-20 20:12:42.145962 - Artichoke', + '2020-03-20 20:12:42.146093 - Tomato', + '2020-03-20 20:12:42.146216 - Potato', + ] + # [END plant_processing_times] + + # Since `time.time()` will always give something different, we'll + # simply strip the timestamp information before testing the results. + actual = actual | beam.Map(lambda row: row.split('-')[-1].strip()) + expected = [row.split('-')[-1].strip() for row in plant_processing_times] + assert_that(actual, equal_to(expected)) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch('apache_beam.examples.snippets.transforms.elementwise.withtimestamps.print', lambda elem: elem) +# pylint: enable=line-too-long +class WithTimestampsTest(unittest.TestCase): + def test_event_time(self): + withtimestamps.withtimestamps_event_time(check_plant_timestamps) + + def test_logical_clock(self): + withtimestamps.withtimestamps_logical_clock(check_plant_events) + + def test_processing_time(self): + withtimestamps.withtimestamps_processing_time(check_plant_processing_times) + + def test_time_tuple2unix_time(self): + unix_time = withtimestamps.time_tuple2unix_time() + self.assertIsInstance(unix_time, float) + + def test_datetime2unix_time(self): + unix_time = withtimestamps.datetime2unix_time() + self.assertIsInstance(unix_time, float) + + +if __name__ == '__main__': + unittest.main()