This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e4f9054 Use EventRecorder instead of relying on class var. new 46ac3bc Merge pull request #13478 from [BEAM-11070]Use EventRecorder instead of relying on class var. e4f9054 is described below commit e4f90544a881df2116bc32f7c42987e4f3e01c9d Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Thu Dec 3 12:01:51 2020 -0800 Use EventRecorder instead of relying on class var. --- .../portability/fn_api_runner/fn_runner_test.py | 59 ++++++++++++++++++---- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 4f7fc27..05bd0d2 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -23,6 +23,7 @@ import collections import logging import os import random +import shutil import sys import tempfile import threading @@ -881,21 +882,19 @@ class FnApiRunnerTest(unittest.TestCase): assert_that(res, equal_to(['1', '2'])) def test_register_finalizations(self): - class FinalizableSplittableDoFn(beam.DoFn): - was_finalized = False - - def set_finalized(self): - self.was_finalized = True + event_recorder = EventRecorder(tempfile.gettempdir()) + class FinalizableSplittableDoFn(beam.DoFn): def process( self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam, restriction_tracker=beam.DoFn.RestrictionParam( - OffsetRangeProvider(use_bounded_offset_range=True))): + OffsetRangeProvider( + use_bounded_offset_range=True, checkpoint_only=True))): # We use SDF to enforce finalization call happens by using # self-initiated checkpoint. - if self.was_finalized: + if 'finalized' in event_recorder.events(): restriction_tracker.try_claim( restriction_tracker.current_restriction().start) yield element @@ -903,7 +902,7 @@ class FnApiRunnerTest(unittest.TestCase): return if restriction_tracker.try_claim( restriction_tracker.current_restriction().start): - bundle_finalizer.register(lambda: self.set_finalized()) + bundle_finalizer.register(lambda: event_recorder.record('finalized')) # We sleep here instead of setting a resume time since the resume time # doesn't need to be honored. time.sleep(1) @@ -917,6 +916,8 @@ class FnApiRunnerTest(unittest.TestCase): | beam.ParDo(FinalizableSplittableDoFn())) assert_that(res, equal_to([max_retries])) + event_recorder.cleanup() + def test_sdf_synthetic_source(self): common_attrs = { 'key_size': 1, @@ -1763,6 +1764,36 @@ def _unpickle_element_counter(name): return _pickled_element_counters[name] +class EventRecorder(object): + """Used to be registered as a callback in bundle finalization. + + The reason why records are written into a tmp file is, the in-memory dataset + cannot keep callback records when passing into one DoFn. + """ + def __init__(self, tmp_dir): + self.tmp_dir = os.path.join(tmp_dir, uuid.uuid4().hex) + os.mkdir(self.tmp_dir) + + def record(self, content): + file_path = os.path.join(self.tmp_dir, uuid.uuid4().hex + '.txt') + with open(file_path, 'w') as f: + f.write(content) + + def events(self): + content = [] + record_files = [ + f for f in os.listdir(self.tmp_dir) + if os.path.isfile(os.path.join(self.tmp_dir, f)) + ] + for file in record_files: + with open(os.path.join(self.tmp_dir, file), 'r') as f: + content.append(f.read()) + return sorted(content) + + def cleanup(self): + shutil.rmtree(self.tmp_dir) + + class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): """A RestrictionProvider that used for sdf related tests.""" def initial_restriction(self, element): @@ -1786,13 +1817,23 @@ class UnboundedOffsetRestrictionTracker( class OffsetRangeProvider(beam.transforms.core.RestrictionProvider): - def __init__(self, use_bounded_offset_range): + def __init__(self, use_bounded_offset_range, checkpoint_only=False): self.use_bounded_offset_range = use_bounded_offset_range + self.checkpoint_only = checkpoint_only def initial_restriction(self, element): return restriction_trackers.OffsetRange(0, element) def create_tracker(self, restriction): + if self.checkpoint_only: + + class CheckpointOnlyOffsetRestrictionTracker( + restriction_trackers.OffsetRestrictionTracker): + def try_split(self, unused_fraction_of_remainder): + return super(CheckpointOnlyOffsetRestrictionTracker, + self).try_split(0.0) + + return CheckpointOnlyOffsetRestrictionTracker(restriction) if self.use_bounded_offset_range: return restriction_trackers.OffsetRestrictionTracker(restriction) return UnboundedOffsetRestrictionTracker(restriction)