This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/partialRevert
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 747e7fa4071ada15b10b8a4b07e10f3b39d3a169
Author: Danny McCormick <[email protected]>
AuthorDate: Fri Dec 8 13:49:20 2023 -0500

    Partially revert changes from #29587
---
 .../apache_beam/runners/worker/bundle_processor.py | 45 ----------------------
 1 file changed, 45 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index b35997c4250..b677200d099 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -378,11 +378,6 @@ 
coder_impl.FastPrimitivesCoderImpl.register_iterable_like_type(
 
 
 class StateBackedSideInputMap(object):
-
-  _BULK_READ_LIMIT = 100
-  _BULK_READ_FULLY = "fully"
-  _BULK_READ_PARTIALLY = "partially"
-
   def __init__(self,
                state_handler,  # type: sdk_worker.CachingStateHandler
                transform_id,  # type: str
@@ -422,53 +417,13 @@ class StateBackedSideInputMap(object):
                 side_input_id=self._tag,
                 window=self._target_window_coder.encode(target_window),
                 key=b''))
-        kv_iter_state_key = beam_fn_api_pb2.StateKey(
-            multimap_keys_values_side_input=beam_fn_api_pb2.StateKey.
-            MultimapKeysValuesSideInput(
-                transform_id=self._transform_id,
-                side_input_id=self._tag,
-                window=self._target_window_coder.encode(target_window)))
         cache = {}
         key_coder = self._element_coder.key_coder()
         key_coder_impl = key_coder.get_impl()
         value_coder = self._element_coder.value_coder()
 
         class MultiMap(object):
-          _bulk_read = None
-          _lock = threading.Lock()
-
           def __getitem__(self, key):
-            if self._bulk_read is None:
-              with self._lock:
-                if self._bulk_read is None:
-                  try:
-                    # Attempt to bulk read the key-values over the iterable
-                    # protocol which, if supported, can be much more efficient
-                    # than point lookups if it fits into memory.
-                    for ix, (k, vs) in enumerate(_StateBackedIterable(
-                        state_handler,
-                        kv_iter_state_key,
-                        coders.TupleCoder(
-                            (key_coder, coders.IterableCoder(value_coder))))):
-                      cache[k] = vs
-                      if ix > StateBackedSideInputMap._BULK_READ_LIMIT:
-                        self._bulk_read = (
-                            StateBackedSideInputMap._BULK_READ_PARTIALLY)
-                        break
-                    else:
-                      # We reached the end of the iteration without breaking.
-                      self._bulk_read = (
-                          StateBackedSideInputMap._BULK_READ_FULLY)
-                  except Exception:
-                    _LOGGER.error(
-                        "Iterable access of map side inputs unsupported.",
-                        exc_info=True)
-                    self._bulk_read = (
-                        StateBackedSideInputMap._BULK_READ_PARTIALLY)
-
-            if (self._bulk_read == StateBackedSideInputMap._BULK_READ_FULLY):
-              return cache.get(key, [])
-
             if key not in cache:
               keyed_state_key = beam_fn_api_pb2.StateKey()
               keyed_state_key.CopyFrom(state_key)

Reply via email to