Add a test to check memory consumption of the direct runner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d021e9ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d021e9ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d021e9ce Branch: refs/heads/python-sdk Commit: d021e9ce7bccb770e187c720bd7a95bb99f1bc8e Parents: 7f201cb Author: Ahmet Altay <al...@google.com> Authored: Thu Nov 3 15:37:49 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Nov 7 17:56:49 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline_test.py | 37 +++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d021e9ce/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index db3ad9e..a4c983f 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -180,6 +180,43 @@ class PipelineTest(unittest.TestCase): ['a-x', 'b-x', 'c-x'], sorted(['a', 'b', 'c'] | 'AddSuffix' >> AddSuffix('-x'))) + def test_memory_usage(self): + try: + import resource + except ImportError: + # Skip the test if resource module is not available (e.g. non-Unix os). + self.skipTest('resource module not available.') + + def get_memory_usage_in_bytes(): + return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * (2 ** 10) + + def check_memory(value, memory_threshold): + memory_usage = get_memory_usage_in_bytes() + if memory_usage > memory_threshold: + raise RuntimeError( + 'High memory usage: %d > %d' % (memory_usage, memory_threshold)) + return value + + len_elements = 1000000 + num_elements = 10 + num_maps = 100 + + pipeline = Pipeline('DirectPipelineRunner') + + # Consumed memory should not be proportional to the number of maps. + memory_threshold = ( + get_memory_usage_in_bytes() + (3 * len_elements * num_elements)) + + biglist = pipeline | 'oom:create' >> Create( + ['x' * len_elements] * num_elements) + for i in range(num_maps): + biglist = biglist | ('oom:addone-%d' % i) >> Map(lambda x: x + 'y') + result = biglist | 'oom:check' >> Map(check_memory, memory_threshold) + assert_that(result, equal_to( + ['x' * len_elements + 'y' * num_maps] * num_elements)) + + pipeline.run() + def test_pipeline_as_context(self): def raise_exception(exn): raise exn