This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 9c60fd5 Revert "[BEAM-2914] Add portable merging window support to Python. (#12995)" new 871e023 Merge pull request #14004 from ajamato/rollback 9c60fd5 is described below commit 9c60fd546458f48912285d38bce52a31d2eb4472 Author: Alex Amato <ajam...@google.com> AuthorDate: Wed Feb 17 11:04:53 2021 -0800 Revert "[BEAM-2914] Add portable merging window support to Python. (#12995)" This reverts commit 625ee1f6e27636f26672e973ecbcecf19a8cb361. --- .../runners/portability/flink_runner_test.py | 3 - .../runners/portability/fn_api_runner/execution.py | 241 +-------------------- .../portability/fn_api_runner/fn_runner_test.py | 37 ---- .../runners/portability/spark_runner_test.py | 3 - .../apache_beam/runners/worker/bundle_processor.py | 43 ---- 5 files changed, 12 insertions(+), 315 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index e97ce49..94e30bf 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -398,9 +398,6 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): def test_register_finalizations(self): raise unittest.SkipTest("BEAM-11021") - def test_custom_merging_window(self): - raise unittest.SkipTest("BEAM-11004") - # Inherits all other tests. diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py index a08aa5f..bc69123 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py @@ -24,8 +24,6 @@ from __future__ import absolute_import import collections import copy import itertools -import uuid -import weakref from typing import TYPE_CHECKING from typing import Any from typing import DefaultDict @@ -57,7 +55,6 @@ from apache_beam.runners.portability.fn_api_runner.translations import only_elem from apache_beam.runners.portability.fn_api_runner.translations import split_buffer_id from apache_beam.runners.portability.fn_api_runner.translations import unique_name from apache_beam.runners.worker import bundle_processor -from apache_beam.transforms import core from apache_beam.transforms import trigger from apache_beam.transforms import window from apache_beam.transforms.window import GlobalWindow @@ -72,6 +69,7 @@ if TYPE_CHECKING: from apache_beam.runners.portability.fn_api_runner.fn_runner import DataOutput from apache_beam.runners.portability.fn_api_runner.fn_runner import OutputTimers from apache_beam.runners.portability.fn_api_runner.translations import DataSideInput + from apache_beam.transforms import core from apache_beam.transforms.window import BoundedWindow ENCODED_IMPULSE_VALUE = WindowedValueCoder( @@ -340,222 +338,6 @@ class GenericNonMergingWindowFn(window.NonMergingWindowFn): context.coders[window_coder_id.decode('utf-8')]) -class GenericMergingWindowFn(window.WindowFn): - - URN = 'internal-generic-merging' - - TO_SDK_TRANSFORM = 'read' - FROM_SDK_TRANSFORM = 'write' - - _HANDLES = {} # type: Dict[str, GenericMergingWindowFn] - - def __init__(self, execution_context, windowing_strategy_proto): - # type: (FnApiRunnerExecutionContext, beam_runner_api_pb2.WindowingStrategy) -> None - self._worker_handler = None # type: Optional[worker_handlers.WorkerHandler] - self._handle_id = handle_id = uuid.uuid4().hex - self._HANDLES[handle_id] = self - # ExecutionContexts are expensive, we don't want to keep them in the - # static dictionary forever. Instead we hold a weakref and pop self - # out of the dict once this context goes away. - self._execution_context_ref_obj = weakref.ref( - execution_context, lambda _: self._HANDLES.pop(handle_id, None)) - self._windowing_strategy_proto = windowing_strategy_proto - self._counter = 0 - # Lazily created in make_process_bundle_descriptor() - self._process_bundle_descriptor = None - self._bundle_processor_id = None # type: Optional[str] - self.windowed_input_coder_impl = None # type: Optional[CoderImpl] - self.windowed_output_coder_impl = None # type: Optional[CoderImpl] - - def _execution_context_ref(self): - # type: () -> FnApiRunnerExecutionContext - result = self._execution_context_ref_obj() - assert result is not None - return result - - def payload(self): - # type: () -> bytes - return self._handle_id.encode('utf-8') - - @staticmethod - @window.urns.RunnerApiFn.register_urn(URN, bytes) - def from_runner_api_parameter(handle_id, unused_context): - # type: (bytes, Any) -> GenericMergingWindowFn - return GenericMergingWindowFn._HANDLES[handle_id.decode('utf-8')] - - def assign(self, assign_context): - # type: (window.WindowFn.AssignContext) -> Iterable[window.BoundedWindow] - raise NotImplementedError() - - def merge(self, merge_context): - # type: (window.WindowFn.MergeContext) -> None - worker_handler = self.worker_handle() - - assert self.windowed_input_coder_impl is not None - assert self.windowed_output_coder_impl is not None - process_bundle_id = self.uid('process') - to_worker = worker_handler.data_conn.output_stream( - process_bundle_id, self.TO_SDK_TRANSFORM) - to_worker.write( - self.windowed_input_coder_impl.encode_nested( - window.GlobalWindows.windowed_value((b'', merge_context.windows)))) - to_worker.close() - - process_bundle_req = beam_fn_api_pb2.InstructionRequest( - instruction_id=process_bundle_id, - process_bundle=beam_fn_api_pb2.ProcessBundleRequest( - process_bundle_descriptor_id=self._bundle_processor_id)) - result_future = worker_handler.control_conn.push(process_bundle_req) - for output in worker_handler.data_conn.input_elements( - process_bundle_id, [self.FROM_SDK_TRANSFORM], - abort_callback=lambda: bool(result_future.is_done() and result_future. - get().error)): - if isinstance(output, beam_fn_api_pb2.Elements.Data): - windowed_result = self.windowed_output_coder_impl.decode_nested( - output.data) - for merge_result, originals in windowed_result.value[1][1]: - merge_context.merge(originals, merge_result) - else: - raise RuntimeError("Unexpected data: %s" % output) - - result = result_future.get() - if result.error: - raise RuntimeError(result.error) - # The result was "returned" via the merge callbacks on merge_context above. - - def get_window_coder(self): - # type: () -> coders.Coder - return self._execution_context_ref().pipeline_context.coders[ - self._windowing_strategy_proto.window_coder_id] - - def worker_handle(self): - # type: () -> worker_handlers.WorkerHandler - if self._worker_handler is None: - worker_handler_manager = self._execution_context_ref( - ).worker_handler_manager - self._worker_handler = worker_handler_manager.get_worker_handlers( - self._windowing_strategy_proto.environment_id, 1)[0] - process_bundle_decriptor = self.make_process_bundle_descriptor( - self._worker_handler.data_api_service_descriptor(), - self._worker_handler.state_api_service_descriptor()) - worker_handler_manager.register_process_bundle_descriptor( - process_bundle_decriptor) - return self._worker_handler - - def make_process_bundle_descriptor( - self, data_api_service_descriptor, state_api_service_descriptor): - # type: (Optional[endpoints_pb2.ApiServiceDescriptor], Optional[endpoints_pb2.ApiServiceDescriptor]) -> beam_fn_api_pb2.ProcessBundleDescriptor - - """Creates a ProcessBundleDescriptor for invoking the WindowFn's - merge operation. - """ - def make_channel_payload(coder_id): - # type: (str) -> bytes - data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id) - if data_api_service_descriptor: - data_spec.api_service_descriptor.url = (data_api_service_descriptor.url) - return data_spec.SerializeToString() - - pipeline_context = self._execution_context_ref().pipeline_context - global_windowing_strategy_id = self.uid('global_windowing_strategy') - global_windowing_strategy_proto = core.Windowing( - window.GlobalWindows()).to_runner_api(pipeline_context) - coders = dict(pipeline_context.coders.get_id_to_proto_map()) - - def make_coder(urn, *components): - # type: (str, str) -> str - coder_proto = beam_runner_api_pb2.Coder( - spec=beam_runner_api_pb2.FunctionSpec(urn=urn), - component_coder_ids=components) - coder_id = self.uid('coder') - coders[coder_id] = coder_proto - pipeline_context.coders.put_proto(coder_id, coder_proto) - return coder_id - - bytes_coder_id = make_coder(common_urns.coders.BYTES.urn) - window_coder_id = self._windowing_strategy_proto.window_coder_id - global_window_coder_id = make_coder(common_urns.coders.GLOBAL_WINDOW.urn) - iter_window_coder_id = make_coder( - common_urns.coders.ITERABLE.urn, window_coder_id) - input_coder_id = make_coder( - common_urns.coders.KV.urn, bytes_coder_id, iter_window_coder_id) - output_coder_id = make_coder( - common_urns.coders.KV.urn, - bytes_coder_id, - make_coder( - common_urns.coders.KV.urn, - iter_window_coder_id, - make_coder( - common_urns.coders.ITERABLE.urn, - make_coder( - common_urns.coders.KV.urn, - window_coder_id, - iter_window_coder_id)))) - windowed_input_coder_id = make_coder( - common_urns.coders.WINDOWED_VALUE.urn, - input_coder_id, - global_window_coder_id) - windowed_output_coder_id = make_coder( - common_urns.coders.WINDOWED_VALUE.urn, - output_coder_id, - global_window_coder_id) - - self.windowed_input_coder_impl = pipeline_context.coders[ - windowed_input_coder_id].get_impl() - self.windowed_output_coder_impl = pipeline_context.coders[ - windowed_output_coder_id].get_impl() - - self._bundle_processor_id = self.uid('merge_windows') - return beam_fn_api_pb2.ProcessBundleDescriptor( - id=self._bundle_processor_id, - transforms={ - self.TO_SDK_TRANSFORM: beam_runner_api_pb2.PTransform( - unique_name='MergeWindows/Read', - spec=beam_runner_api_pb2.FunctionSpec( - urn=bundle_processor.DATA_INPUT_URN, - payload=make_channel_payload(windowed_input_coder_id)), - outputs={'input': 'input'}), - 'Merge': beam_runner_api_pb2.PTransform( - unique_name='MergeWindows/Merge', - spec=beam_runner_api_pb2.FunctionSpec( - urn=common_urns.primitives.MERGE_WINDOWS.urn, - payload=self._windowing_strategy_proto.window_fn. - SerializeToString()), - inputs={'input': 'input'}, - outputs={'output': 'output'}), - self.FROM_SDK_TRANSFORM: beam_runner_api_pb2.PTransform( - unique_name='MergeWindows/Write', - spec=beam_runner_api_pb2.FunctionSpec( - urn=bundle_processor.DATA_OUTPUT_URN, - payload=make_channel_payload(windowed_output_coder_id)), - inputs={'output': 'output'}), - }, - pcollections={ - 'input': beam_runner_api_pb2.PCollection( - unique_name='input', - windowing_strategy_id=global_windowing_strategy_id, - coder_id=input_coder_id), - 'output': beam_runner_api_pb2.PCollection( - unique_name='output', - windowing_strategy_id=global_windowing_strategy_id, - coder_id=output_coder_id), - }, - coders=coders, - windowing_strategies={ - global_windowing_strategy_id: global_windowing_strategy_proto, - }, - environments=dict( - self._execution_context_ref().pipeline_components.environments. - items()), - state_api_service_descriptor=state_api_service_descriptor, - timer_api_service_descriptor=data_api_service_descriptor) - - def uid(self, name=''): - # type: (str) -> str - self._counter += 1 - return '%s_%s_%s' % (self._handle_id, name, self._counter) - - class FnApiRunnerExecutionContext(object): """ :var pcoll_buffers: (dict): Mapping of @@ -661,22 +443,23 @@ class FnApiRunnerExecutionContext(object): windowing_strategy_proto = self.pipeline_components.windowing_strategies[id] if windowing_strategy_proto.window_fn.urn in SAFE_WINDOW_FNS: return id - else: + elif (windowing_strategy_proto.merge_status == + beam_runner_api_pb2.MergeStatus.NON_MERGING) or True: safe_id = id + '_safe' while safe_id in self.pipeline_components.windowing_strategies: safe_id += '_' safe_proto = copy.copy(windowing_strategy_proto) - if (windowing_strategy_proto.merge_status == - beam_runner_api_pb2.MergeStatus.NON_MERGING): - safe_proto.window_fn.urn = GenericNonMergingWindowFn.URN - safe_proto.window_fn.payload = ( - windowing_strategy_proto.window_coder_id.encode('utf-8')) - else: - window_fn = GenericMergingWindowFn(self, windowing_strategy_proto) - safe_proto.window_fn.urn = GenericMergingWindowFn.URN - safe_proto.window_fn.payload = window_fn.payload() + safe_proto.window_fn.urn = GenericNonMergingWindowFn.URN + safe_proto.window_fn.payload = ( + windowing_strategy_proto.window_coder_id.encode('utf-8')) self.pipeline_context.windowing_strategies.put_proto(safe_id, safe_proto) return safe_id + elif windowing_strategy_proto.window_fn.urn == python_urns.PICKLED_WINDOWFN: + return id + else: + raise NotImplementedError( + '[BEAM-10119] Unknown merging WindowFn: %s' % + windowing_strategy_proto) @property def state_servicer(self): diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 912074e..7bbdd3c 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -20,7 +20,6 @@ from __future__ import absolute_import from __future__ import print_function import collections -import gc import logging import os import random @@ -47,7 +46,6 @@ from tenacity import retry from tenacity import stop_after_attempt import apache_beam as beam -from apache_beam.coders import coders from apache_beam.coders.coders import StrUtf8Coder from apache_beam.io import restriction_trackers from apache_beam.io.watermark_estimators import ManualWatermarkEstimator @@ -782,21 +780,6 @@ class FnApiRunnerTest(unittest.TestCase): | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1])))) assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])])) - def test_custom_merging_window(self): - with self.create_pipeline() as p: - res = ( - p - | beam.Create([1, 2, 100, 101, 102]) - | beam.Map(lambda t: window.TimestampedValue(('k', t), t)) - | beam.WindowInto(CustomMergingWindowFn()) - | beam.GroupByKey() - | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1])))) - assert_that( - res, equal_to([('k', [1]), ('k', [101]), ('k', [2, 100, 102])])) - gc.collect() - from apache_beam.runners.portability.fn_api_runner.execution import GenericMergingWindowFn - self.assertEqual(GenericMergingWindowFn._HANDLES, {}) - @unittest.skip('BEAM-9119: test is flaky') def test_large_elements(self): with self.create_pipeline() as p: @@ -2019,26 +2002,6 @@ class FnApiBasedStateBackedCoderTest(unittest.TestCase): assert_that(r, equal_to([VALUES_PER_ELEMENT * NUM_OF_ELEMENTS])) -# TODO(robertwb): Why does pickling break when this is inlined? -class CustomMergingWindowFn(window.WindowFn): - def assign(self, assign_context): - return [ - window.IntervalWindow( - assign_context.timestamp, assign_context.timestamp + 1) - ] - - def merge(self, merge_context): - evens = [w for w in merge_context.windows if w.start % 2 == 0] - if evens: - merge_context.merge( - evens, - window.IntervalWindow( - min(w.start for w in evens), max(w.end for w in evens))) - - def get_window_coder(self): - return coders.IntervalWindowCoder() - - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/runners/portability/spark_runner_test.py b/sdks/python/apache_beam/runners/portability/spark_runner_test.py index 3473cad..062e06f 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner_test.py @@ -181,9 +181,6 @@ class SparkRunnerTest(portable_runner_test.PortableRunnerTest): super(SparkRunnerTest, self).test_flattened_side_input(with_transcoding=False) - def test_custom_merging_window(self): - raise unittest.SkipTest("BEAM-11004") - # Inherits all other tests from PortableRunnerTest. diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index f05228e..1e1c058 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -76,7 +76,6 @@ from apache_beam.transforms import TimeDomain from apache_beam.transforms import core from apache_beam.transforms import sideinputs from apache_beam.transforms import userstate -from apache_beam.transforms import window from apache_beam.utils import counters from apache_beam.utils import proto_utils from apache_beam.utils import timestamp @@ -1857,48 +1856,6 @@ def create_map_windows( factory, transform_id, transform_proto, consumers, MapWindows()) -@BeamTransformFactory.register_urn( - common_urns.primitives.MERGE_WINDOWS.urn, beam_runner_api_pb2.FunctionSpec) -def create_merge_windows( - factory, # type: BeamTransformFactory - transform_id, # type: str - transform_proto, # type: beam_runner_api_pb2.PTransform - mapping_fn_spec, # type: beam_runner_api_pb2.FunctionSpec - consumers # type: Dict[str, List[operations.Operation]] -): - assert mapping_fn_spec.urn == python_urns.PICKLED_WINDOWFN - window_fn = pickler.loads(mapping_fn_spec.payload) - - class MergeWindows(beam.DoFn): - def process(self, element): - nonce, windows = element - - original_windows = set(windows) # type: Set[window.BoundedWindow] - merged_windows = collections.defaultdict( - set - ) # type: MutableMapping[window.BoundedWindow, Set[window.BoundedWindow]] - - class RecordingMergeContext(window.WindowFn.MergeContext): - def merge( - self, - to_be_merged, # type: Iterable[window.BoundedWindow] - merge_result, # type: window.BoundedWindow - ): - originals = merged_windows[merge_result] - for window in to_be_merged: - if window in original_windows: - originals.add(window) - original_windows.remove(window) - else: - originals.update(merged_windows.pop(window)) - - window_fn.merge(RecordingMergeContext(windows)) - yield nonce, (original_windows, merged_windows.items()) - - return _create_simple_pardo_operation( - factory, transform_id, transform_proto, consumers, MergeWindows()) - - @BeamTransformFactory.register_urn(common_urns.primitives.TO_STRING.urn, None) def create_to_string_fn( factory, # type: BeamTransformFactory