Repository: incubator-beam Updated Branches: refs/heads/python-sdk 0af015017 -> dbba2d3b1
Updates SourceTestBase concurrent splitting test to share thread pool across runs. Without this, runs could fail in environments that prevents two many threads from being created. Performs some slight fixes to the source_test_utils_test. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b901c59a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b901c59a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b901c59a Branch: refs/heads/python-sdk Commit: b901c59a15ebe9171c3b216e666fc0f79a61429d Parents: 0af0150 Author: Chamikara Jayalath <chamik...@google.com> Authored: Thu Aug 25 23:18:42 2016 -0700 Committer: Chamikara Jayalath <chamik...@google.com> Committed: Thu Aug 25 23:18:42 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/source_test_utils.py | 68 +++++++++++--------- .../apache_beam/io/source_test_utils_test.py | 10 ++- 2 files changed, 45 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b901c59a/sdks/python/apache_beam/io/source_test_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py index 1a2c21a..9968e9d 100644 --- a/sdks/python/apache_beam/io/source_test_utils.py +++ b/sdks/python/apache_beam/io/source_test_utils.py @@ -488,29 +488,33 @@ def assertSplitAtFractionExhaustive(source, perform_multi_threaded_test=True): have_success = False have_failure = False - while True: - num_trials += 1 - if (num_trials > - MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM): - logging.warn( - 'After %d concurrent splitting trials at item #%d, observed ' - 'only %s, giving up on this item', - num_trials, - i, - 'success' if have_success else 'failure' - ) - break - - if _assertSplitAtFractionConcurrent( - source, expected_items, i, min_non_trivial_fraction): - have_success = True - else: - have_failure = True - - if have_success and have_failure: - logging.info('%d trials to observe both success and failure of ' - 'concurrent splitting at item #%d', num_trials, i) - break + thread_pool = ThreadPool(2) + try: + while True: + num_trials += 1 + if (num_trials > + MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM): + logging.warn( + 'After %d concurrent splitting trials at item #%d, observed ' + 'only %s, giving up on this item', + num_trials, + i, + 'success' if have_success else 'failure' + ) + break + + if _assertSplitAtFractionConcurrent( + source, expected_items, i, min_non_trivial_fraction, thread_pool): + have_success = True + else: + have_failure = True + + if have_success and have_failure: + logging.info('%d trials to observe both success and failure of ' + 'concurrent splitting at item #%d', num_trials, i) + break + finally: + thread_pool.close() num_total_trials += num_trials @@ -525,7 +529,7 @@ def assertSplitAtFractionExhaustive(source, perform_multi_threaded_test=True): def _assertSplitAtFractionConcurrent( source, expected_items, num_items_to_read_before_splitting, - split_fraction): + split_fraction, thread_pool=None): range_tracker = source.get_range_tracker(None, None) stop_position_before_split = range_tracker.stop_position() @@ -545,13 +549,15 @@ def _assertSplitAtFractionConcurrent( return result inputs = [] - - inputs.append([True, reader_iter]) - inputs.append([False, range_tracker, split_fraction]) - - pool = ThreadPool(2) - results = pool.map(read_or_split, inputs) - pool.close() + pool = thread_pool if thread_pool else ThreadPool(2) + try: + inputs.append([True, reader_iter]) + inputs.append([False, range_tracker, split_fraction]) + + results = pool.map(read_or_split, inputs) + finally: + if not thread_pool: + pool.close() current_items.extend(results[0]) primary_range = ( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b901c59a/sdks/python/apache_beam/io/source_test_utils_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/source_test_utils_test.py b/sdks/python/apache_beam/io/source_test_utils_test.py index 3aff07f..2a207b6 100644 --- a/sdks/python/apache_beam/io/source_test_utils_test.py +++ b/sdks/python/apache_beam/io/source_test_utils_test.py @@ -15,10 +15,12 @@ # limitations under the License. # +import logging import tempfile import unittest -from filebasedsource_test import LineSource -import source_test_utils + +from apache_beam.io.filebasedsource_test import LineSource +import apache_beam.io.source_test_utils as source_test_utils class SourceTestUtilsTest(unittest.TestCase): @@ -115,3 +117,7 @@ class SourceTestUtilsTest(unittest.TestCase): source = self._create_source(data) source_test_utils.assertSplitAtFractionExhaustive(source) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main()