This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b29102e [BEAM-11070] Use self-checkpoint to enfore finalization happens. new 9b51d4b Merge pull request #13338 from [BEAM-11070] Use self-checkpoint to enforce finalization happens. b29102e is described below commit b29102ed4c4186964cd281edf7851898535cfe02 Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Fri Nov 13 12:06:20 2020 -0800 [BEAM-11070] Use self-checkpoint to enfore finalization happens. --- .../runners/portability/flink_runner_test.py | 3 +- .../portability/fn_api_runner/fn_runner_test.py | 81 ++++++++++------------ 2 files changed, 39 insertions(+), 45 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 303fe96..12ed1b8 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -442,7 +442,8 @@ class FlinkRunnerTestStreaming(FlinkRunnerTest): super(FlinkRunnerTest, self).test_callbacks_with_exception() def test_register_finalizations(self): - raise unittest.SkipTest("BEAM-11070") + self.enable_commit = True + super(FlinkRunnerTest, self).test_register_finalizations() if __name__ == '__main__': diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 91343b5..18a30e9 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -23,7 +23,6 @@ import collections import logging import os import random -import shutil import sys import tempfile import threading @@ -835,23 +834,41 @@ class FnApiRunnerTest(unittest.TestCase): assert_that(res, equal_to(['1', '2'])) def test_register_finalizations(self): - event_recorder = EventRecorder(tempfile.gettempdir()) - elements_list = ['2', '1'] + class FinalizableSplittableDoFn(beam.DoFn): + was_finalized = False + + def set_finalized(self): + self.was_finalized = True - class FinalizableDoFn(beam.DoFn): def process( - self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam): - bundle_finalizer.register(lambda: event_recorder.record(element)) - yield element + self, + element, + bundle_finalizer=beam.DoFn.BundleFinalizerParam, + restriction_tracker=beam.DoFn.RestrictionParam( + OffsetRangeProvider(use_bounded_offset_range=True))): + # We use SDF to enforce finalization call happens by using + # self-initiated checkpoint. + if self.was_finalized: + restriction_tracker.try_claim( + restriction_tracker.current_restriction().start) + yield element + restriction_tracker.try_claim(element) + return + if restriction_tracker.try_claim( + restriction_tracker.current_restriction().start): + bundle_finalizer.register(lambda: self.set_finalized()) + # We sleep here instead of setting a resume time since the resume time + # doesn't need to be honored. + time.sleep(1) + restriction_tracker.defer_remainder() with self.create_pipeline() as p: - res = (p | beam.Create(elements_list) | beam.ParDo(FinalizableDoFn())) - - assert_that(res, equal_to(elements_list)) - - results = event_recorder.events() - event_recorder.cleanup() - self.assertEqual(results, sorted(elements_list)) + max_retries = 100 + res = ( + p + | beam.Create([max_retries]) + | beam.ParDo(FinalizableSplittableDoFn())) + assert_that(res, equal_to([max_retries])) def test_sdf_synthetic_source(self): common_attrs = { @@ -1331,6 +1348,9 @@ class FnApiRunnerTestWithMultiWorkers(FnApiRunnerTest): def test_sdf_with_watermark_tracking(self): raise unittest.SkipTest("This test is for a single worker only.") + def test_register_finalizations(self): + raise unittest.SkipTest("This test is for a single worker only.") + class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest): def create_pipeline(self, is_drain=False): @@ -1355,6 +1375,9 @@ class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest): def test_sdf_with_watermark_tracking(self): raise unittest.SkipTest("This test is for a single worker only.") + def test_register_finalizations(self): + raise unittest.SkipTest("This test is for a single worker only.") + class FnApiRunnerTestWithBundleRepeat(FnApiRunnerTest): def create_pipeline(self, is_drain=False): @@ -1684,36 +1707,6 @@ def _unpickle_element_counter(name): return _pickled_element_counters[name] -class EventRecorder(object): - """Used to be registered as a callback in bundle finalization. - - The reason why records are written into a tmp file is, the in-memory dataset - cannot keep callback records when passing into one DoFn. - """ - def __init__(self, tmp_dir): - self.tmp_dir = os.path.join(tmp_dir, uuid.uuid4().hex) - os.mkdir(self.tmp_dir) - - def record(self, content): - file_path = os.path.join(self.tmp_dir, uuid.uuid4().hex + '.txt') - with open(file_path, 'w') as f: - f.write(content) - - def events(self): - content = [] - record_files = [ - f for f in os.listdir(self.tmp_dir) - if os.path.isfile(os.path.join(self.tmp_dir, f)) - ] - for file in record_files: - with open(os.path.join(self.tmp_dir, file), 'r') as f: - content.append(f.read()) - return sorted(content) - - def cleanup(self): - shutil.rmtree(self.tmp_dir) - - class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): """A RestrictionProvider that used for sdf related tests.""" def initial_restriction(self, element):