This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e4f9054  Use EventRecorder instead of relying on class var.
     new 46ac3bc  Merge pull request #13478 from [BEAM-11070]Use EventRecorder 
instead of relying on class var.
e4f9054 is described below

commit e4f90544a881df2116bc32f7c42987e4f3e01c9d
Author: Boyuan Zhang <boyu...@google.com>
AuthorDate: Thu Dec 3 12:01:51 2020 -0800

    Use EventRecorder instead of relying on class var.
---
 .../portability/fn_api_runner/fn_runner_test.py    | 59 ++++++++++++++++++----
 1 file changed, 50 insertions(+), 9 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 4f7fc27..05bd0d2 100644
--- 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -23,6 +23,7 @@ import collections
 import logging
 import os
 import random
+import shutil
 import sys
 import tempfile
 import threading
@@ -881,21 +882,19 @@ class FnApiRunnerTest(unittest.TestCase):
       assert_that(res, equal_to(['1', '2']))
 
   def test_register_finalizations(self):
-    class FinalizableSplittableDoFn(beam.DoFn):
-      was_finalized = False
-
-      def set_finalized(self):
-        self.was_finalized = True
+    event_recorder = EventRecorder(tempfile.gettempdir())
 
+    class FinalizableSplittableDoFn(beam.DoFn):
       def process(
           self,
           element,
           bundle_finalizer=beam.DoFn.BundleFinalizerParam,
           restriction_tracker=beam.DoFn.RestrictionParam(
-              OffsetRangeProvider(use_bounded_offset_range=True))):
+              OffsetRangeProvider(
+                  use_bounded_offset_range=True, checkpoint_only=True))):
         # We use SDF to enforce finalization call happens by using
         # self-initiated checkpoint.
-        if self.was_finalized:
+        if 'finalized' in event_recorder.events():
           restriction_tracker.try_claim(
               restriction_tracker.current_restriction().start)
           yield element
@@ -903,7 +902,7 @@ class FnApiRunnerTest(unittest.TestCase):
           return
         if restriction_tracker.try_claim(
             restriction_tracker.current_restriction().start):
-          bundle_finalizer.register(lambda: self.set_finalized())
+          bundle_finalizer.register(lambda: event_recorder.record('finalized'))
           # We sleep here instead of setting a resume time since the resume 
time
           # doesn't need to be honored.
           time.sleep(1)
@@ -917,6 +916,8 @@ class FnApiRunnerTest(unittest.TestCase):
           | beam.ParDo(FinalizableSplittableDoFn()))
       assert_that(res, equal_to([max_retries]))
 
+    event_recorder.cleanup()
+
   def test_sdf_synthetic_source(self):
     common_attrs = {
         'key_size': 1,
@@ -1763,6 +1764,36 @@ def _unpickle_element_counter(name):
   return _pickled_element_counters[name]
 
 
+class EventRecorder(object):
+  """Used to be registered as a callback in bundle finalization.
+
+  The reason why records are written into a tmp file is, the in-memory dataset
+  cannot keep callback records when passing into one DoFn.
+  """
+  def __init__(self, tmp_dir):
+    self.tmp_dir = os.path.join(tmp_dir, uuid.uuid4().hex)
+    os.mkdir(self.tmp_dir)
+
+  def record(self, content):
+    file_path = os.path.join(self.tmp_dir, uuid.uuid4().hex + '.txt')
+    with open(file_path, 'w') as f:
+      f.write(content)
+
+  def events(self):
+    content = []
+    record_files = [
+        f for f in os.listdir(self.tmp_dir)
+        if os.path.isfile(os.path.join(self.tmp_dir, f))
+    ]
+    for file in record_files:
+      with open(os.path.join(self.tmp_dir, file), 'r') as f:
+        content.append(f.read())
+    return sorted(content)
+
+  def cleanup(self):
+    shutil.rmtree(self.tmp_dir)
+
+
 class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
   """A RestrictionProvider that used for sdf related tests."""
   def initial_restriction(self, element):
@@ -1786,13 +1817,23 @@ class UnboundedOffsetRestrictionTracker(
 
 
 class OffsetRangeProvider(beam.transforms.core.RestrictionProvider):
-  def __init__(self, use_bounded_offset_range):
+  def __init__(self, use_bounded_offset_range, checkpoint_only=False):
     self.use_bounded_offset_range = use_bounded_offset_range
+    self.checkpoint_only = checkpoint_only
 
   def initial_restriction(self, element):
     return restriction_trackers.OffsetRange(0, element)
 
   def create_tracker(self, restriction):
+    if self.checkpoint_only:
+
+      class CheckpointOnlyOffsetRestrictionTracker(
+          restriction_trackers.OffsetRestrictionTracker):
+        def try_split(self, unused_fraction_of_remainder):
+          return super(CheckpointOnlyOffsetRestrictionTracker,
+                       self).try_split(0.0)
+
+      return CheckpointOnlyOffsetRestrictionTracker(restriction)
     if self.use_bounded_offset_range:
       return restriction_trackers.OffsetRestrictionTracker(restriction)
     return UnboundedOffsetRestrictionTracker(restriction)

Reply via email to