This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e8bda0e  Fix data races in BCJ and RecordingManager
     new 5d81e84  Merge pull request #12797 from [BEAM-10603] Fix data races in 
BCJ and RecordingManager
e8bda0e is described below

commit e8bda0ef9c7a99cdc600e3f280d5f80b5328e8de
Author: Sam Rohde <[email protected]>
AuthorDate: Wed Sep 9 10:47:31 2020 -0700

    Fix data races in BCJ and RecordingManager
    
    There were some data races regarding the non-threadsafe PipelineResult
    in the BackgroundCachingJob and the RecordingManager. This adds locks
    around PipelineResults.
    
    Change-Id: I612171dc6ef9225335cfeb1bfec9871986e43c47
---
 .../runners/interactive/background_caching_job.py  | 44 +++++++++++++++-------
 .../runners/interactive/recording_manager.py       | 21 +++++++----
 2 files changed, 45 insertions(+), 20 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/interactive/background_caching_job.py 
b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index ec1c3c4..0e368fb 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -68,6 +68,7 @@ class BackgroundCachingJob(object):
   """
   def __init__(self, pipeline_result, limiters):
     self._pipeline_result = pipeline_result
+    self._result_lock = threading.RLock()
     self._condition_checker = threading.Thread(
         target=self._background_caching_job_condition_checker, daemon=True)
 
@@ -77,7 +78,11 @@ class BackgroundCachingJob(object):
     self._condition_checker.start()
 
   def _background_caching_job_condition_checker(self):
-    while not PipelineState.is_terminal(self._pipeline_result.state):
+    while True:
+      with self._result_lock:
+        if PipelineState.is_terminal(self._pipeline_result.state):
+          break
+
       if self._should_end_condition_checker():
         self.cancel()
         break
@@ -87,30 +92,41 @@ class BackgroundCachingJob(object):
     return any([l.is_triggered() for l in self._limiters])
 
   def is_done(self):
-    is_terminated = self._pipeline_result.state in (
-        PipelineState.DONE, PipelineState.CANCELLED)
-    is_triggered = self._should_end_condition_checker()
-    is_cancelling = self._pipeline_result.state is PipelineState.CANCELLING
+    with self._result_lock:
+      is_terminated = self._pipeline_result.state in (
+          PipelineState.DONE, PipelineState.CANCELLED)
+      is_triggered = self._should_end_condition_checker()
+      is_cancelling = self._pipeline_result.state is PipelineState.CANCELLING
     return is_terminated or (is_triggered and is_cancelling)
 
   def is_running(self):
-    return self._pipeline_result.state is PipelineState.RUNNING
+    with self._result_lock:
+      return self._pipeline_result.state is PipelineState.RUNNING
 
   def cancel(self):
     """Cancels this background caching job.
     """
-    if not PipelineState.is_terminal(self._pipeline_result.state):
-      try:
-        self._pipeline_result.cancel()
-        self._pipeline_result.wait_until_finish()
-      except NotImplementedError:
-        # Ignore the cancel invocation if it is never implemented by the 
runner.
-        pass
+    with self._result_lock:
+      if not PipelineState.is_terminal(self._pipeline_result.state):
+        try:
+          self._pipeline_result.cancel()
+          # self._pipeline_result.wait_until_finish()
+        except NotImplementedError:
+          # Ignore the cancel invocation if it is never implemented by the
+          # runner.
+          pass
+
+  @property
+  def state(self):
+    with self._result_lock:
+      return self._pipeline_result.state
 
 
 def attempt_to_run_background_caching_job(runner, user_pipeline, options=None):
   """Attempts to run a background caching job for a user-defined pipeline.
 
+  Returns True if a job was started, False otherwise.
+
   The pipeline result is automatically tracked by Interactive Beam in case
   future cancellation/cleanup is needed.
   """
@@ -134,6 +150,8 @@ def attempt_to_run_background_caching_job(runner, 
user_pipeline, options=None):
     ie.current_env().set_background_caching_job(
         user_pipeline,
         BackgroundCachingJob(background_caching_job_result, limiters=limiters))
+    return True
+  return False
 
 
 def is_background_caching_job_needed(user_pipeline):
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager.py 
b/sdks/python/apache_beam/runners/interactive/recording_manager.py
index a5ce082..f67eb46 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager.py
@@ -149,6 +149,7 @@ class Recording:
 
     self._user_pipeline = user_pipeline
     self._result = result
+    self._result_lock = threading.Lock()
     self._pcolls = pcolls
 
     pcoll_var = lambda pcoll: pipeline_instrument.cacheable_var_by_pcoll_id(
@@ -182,13 +183,18 @@ class Recording:
       return
 
     while not PipelineState.is_terminal(self._result.state):
-      if time.time() - self._start >= self._duration_secs:
-        self._result.cancel()
-        self._result.wait_until_finish()
+      with self._result_lock:
+        bcj = ie.current_env().get_background_caching_job(self._user_pipeline)
+        if bcj and bcj.is_done():
+          self._result.wait_until_finish()
 
-      elif all(s.is_done() for s in self._streams.values()):
-        self._result.cancel()
-        self._result.wait_until_finish()
+        elif time.time() - self._start >= self._duration_secs:
+          self._result.cancel()
+          self._result.wait_until_finish()
+
+        elif all(s.is_done() for s in self._streams.values()):
+          self._result.cancel()
+          self._result.wait_until_finish()
 
       time.sleep(0.1)
 
@@ -225,7 +231,8 @@ class Recording:
     # type: () -> None
 
     """Cancels the recording."""
-    self._result.cancel()
+    with self._result_lock:
+      self._result.cancel()
 
   def wait_until_finish(self):
     # type: () -> None

Reply via email to