lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966414091
##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -87,18 +88,46 @@ def get_referents_for_cache(*objs):
return rval
+class _LoadingValue(WeightedValue):
+ """Allows concurrent users of the cache to wait for a value to be loaded."""
+ def __init__(self):
+ # type: () -> None
+ super().__init__(None, 1)
+ self._wait_event = threading.Event()
+
+ def load(self, key, loading_fn):
+ # type: (Any, Callable[[Any], Any]) -> None
+ try:
+ self._value = loading_fn(key)
+ except Exception as err:
+ self._error = err
+ finally:
+ self._wait_event.set()
+
+ def value(self):
+ # type: () -> Any
+ self._wait_event.wait()
+ err = getattr(self, "_error", None)
+ if err:
+ raise err
+ return self._value
+
+
class StateCache(object):
"""Cache for Beam state access, scoped by state key and cache_token.
Assumes a bag state implementation.
- For a given state_key and cache_token, caches a value and allows to
- a) read from the cache (get),
- if the currently stored cache_token matches the provided
- b) write to the cache (put),
- storing the new value alongside with a cache token
- c) empty a cached element (clear),
- if the currently stored cache_token matches the provided
+ For a given key, caches a value and allows to
+ a) peek at the cache (peek),
Review Comment:
Clarified in comments that the cache implements an LRU cache.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]