Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 0af015017 -> dbba2d3b1


Updates SourceTestBase concurrent splitting test to share thread pool across 
runs.

Without this, runs could fail in environments that prevents two many threads 
from being created.

Performs some slight fixes to the source_test_utils_test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b901c59a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b901c59a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b901c59a

Branch: refs/heads/python-sdk
Commit: b901c59a15ebe9171c3b216e666fc0f79a61429d
Parents: 0af0150
Author: Chamikara Jayalath <chamik...@google.com>
Authored: Thu Aug 25 23:18:42 2016 -0700
Committer: Chamikara Jayalath <chamik...@google.com>
Committed: Thu Aug 25 23:18:42 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/source_test_utils.py | 68 +++++++++++---------
 .../apache_beam/io/source_test_utils_test.py    | 10 ++-
 2 files changed, 45 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b901c59a/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py 
b/sdks/python/apache_beam/io/source_test_utils.py
index 1a2c21a..9968e9d 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -488,29 +488,33 @@ def assertSplitAtFractionExhaustive(source, 
perform_multi_threaded_test=True):
     have_success = False
     have_failure = False
 
-    while True:
-      num_trials += 1
-      if (num_trials >
-          MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM):
-        logging.warn(
-            'After %d concurrent splitting trials at item #%d, observed '
-            'only %s, giving up on this item',
-            num_trials,
-            i,
-            'success' if have_success else 'failure'
-        )
-        break
-
-      if _assertSplitAtFractionConcurrent(
-          source, expected_items, i, min_non_trivial_fraction):
-        have_success = True
-      else:
-        have_failure = True
-
-      if have_success and have_failure:
-        logging.info('%d trials to observe both success and failure of '
-                     'concurrent splitting at item #%d', num_trials, i)
-        break
+    thread_pool = ThreadPool(2)
+    try:
+      while True:
+        num_trials += 1
+        if (num_trials >
+            MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM):
+          logging.warn(
+              'After %d concurrent splitting trials at item #%d, observed '
+              'only %s, giving up on this item',
+              num_trials,
+              i,
+              'success' if have_success else 'failure'
+          )
+          break
+
+        if _assertSplitAtFractionConcurrent(
+            source, expected_items, i, min_non_trivial_fraction, thread_pool):
+          have_success = True
+        else:
+          have_failure = True
+
+        if have_success and have_failure:
+          logging.info('%d trials to observe both success and failure of '
+                       'concurrent splitting at item #%d', num_trials, i)
+          break
+    finally:
+      thread_pool.close()
 
     num_total_trials += num_trials
 
@@ -525,7 +529,7 @@ def assertSplitAtFractionExhaustive(source, 
perform_multi_threaded_test=True):
 
 def _assertSplitAtFractionConcurrent(
     source, expected_items, num_items_to_read_before_splitting,
-    split_fraction):
+    split_fraction, thread_pool=None):
 
   range_tracker = source.get_range_tracker(None, None)
   stop_position_before_split = range_tracker.stop_position()
@@ -545,13 +549,15 @@ def _assertSplitAtFractionConcurrent(
       return result
 
   inputs = []
-
-  inputs.append([True, reader_iter])
-  inputs.append([False, range_tracker, split_fraction])
-
-  pool = ThreadPool(2)
-  results = pool.map(read_or_split, inputs)
-  pool.close()
+  pool = thread_pool if thread_pool else ThreadPool(2)
+  try:
+    inputs.append([True, reader_iter])
+    inputs.append([False, range_tracker, split_fraction])
+
+    results = pool.map(read_or_split, inputs)
+  finally:
+    if not thread_pool:
+      pool.close()
 
   current_items.extend(results[0])
   primary_range = (

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b901c59a/sdks/python/apache_beam/io/source_test_utils_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils_test.py 
b/sdks/python/apache_beam/io/source_test_utils_test.py
index 3aff07f..2a207b6 100644
--- a/sdks/python/apache_beam/io/source_test_utils_test.py
+++ b/sdks/python/apache_beam/io/source_test_utils_test.py
@@ -15,10 +15,12 @@
 # limitations under the License.
 #
 
+import logging
 import tempfile
 import unittest
-from filebasedsource_test import LineSource
-import source_test_utils
+
+from apache_beam.io.filebasedsource_test import LineSource
+import apache_beam.io.source_test_utils as source_test_utils
 
 
 class SourceTestUtilsTest(unittest.TestCase):
@@ -115,3 +117,7 @@ class SourceTestUtilsTest(unittest.TestCase):
     source = self._create_source(data)
 
     source_test_utils.assertSplitAtFractionExhaustive(source)
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

Reply via email to