This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/partialRevert in repository https://gitbox.apache.org/repos/asf/beam.git
commit 747e7fa4071ada15b10b8a4b07e10f3b39d3a169 Author: Danny McCormick <[email protected]> AuthorDate: Fri Dec 8 13:49:20 2023 -0500 Partially revert changes from #29587 --- .../apache_beam/runners/worker/bundle_processor.py | 45 ---------------------- 1 file changed, 45 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index b35997c4250..b677200d099 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -378,11 +378,6 @@ coder_impl.FastPrimitivesCoderImpl.register_iterable_like_type( class StateBackedSideInputMap(object): - - _BULK_READ_LIMIT = 100 - _BULK_READ_FULLY = "fully" - _BULK_READ_PARTIALLY = "partially" - def __init__(self, state_handler, # type: sdk_worker.CachingStateHandler transform_id, # type: str @@ -422,53 +417,13 @@ class StateBackedSideInputMap(object): side_input_id=self._tag, window=self._target_window_coder.encode(target_window), key=b'')) - kv_iter_state_key = beam_fn_api_pb2.StateKey( - multimap_keys_values_side_input=beam_fn_api_pb2.StateKey. - MultimapKeysValuesSideInput( - transform_id=self._transform_id, - side_input_id=self._tag, - window=self._target_window_coder.encode(target_window))) cache = {} key_coder = self._element_coder.key_coder() key_coder_impl = key_coder.get_impl() value_coder = self._element_coder.value_coder() class MultiMap(object): - _bulk_read = None - _lock = threading.Lock() - def __getitem__(self, key): - if self._bulk_read is None: - with self._lock: - if self._bulk_read is None: - try: - # Attempt to bulk read the key-values over the iterable - # protocol which, if supported, can be much more efficient - # than point lookups if it fits into memory. - for ix, (k, vs) in enumerate(_StateBackedIterable( - state_handler, - kv_iter_state_key, - coders.TupleCoder( - (key_coder, coders.IterableCoder(value_coder))))): - cache[k] = vs - if ix > StateBackedSideInputMap._BULK_READ_LIMIT: - self._bulk_read = ( - StateBackedSideInputMap._BULK_READ_PARTIALLY) - break - else: - # We reached the end of the iteration without breaking. - self._bulk_read = ( - StateBackedSideInputMap._BULK_READ_FULLY) - except Exception: - _LOGGER.error( - "Iterable access of map side inputs unsupported.", - exc_info=True) - self._bulk_read = ( - StateBackedSideInputMap._BULK_READ_PARTIALLY) - - if (self._bulk_read == StateBackedSideInputMap._BULK_READ_FULLY): - return cache.get(key, []) - if key not in cache: keyed_state_key = beam_fn_api_pb2.StateKey() keyed_state_key.CopyFrom(state_key)
