[ 
https://issues.apache.org/jira/browse/BEAM-8298?focusedWorklogId=378512&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378512
 ]

ASF GitHub Bot logged work on BEAM-8298:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jan/20 22:08
            Start Date: 28/Jan/20 22:08
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on pull request #10705: [BEAM-8298] 
Implement side input caching.
URL: https://github.com/apache/beam/pull/10705#discussion_r372080828
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
 ##########
 @@ -825,43 +833,92 @@ def extend(self,
 
   def clear(self, state_key, is_cached=False):
     # type: (beam_fn_api_pb2.StateKey, bool) -> _Future
-    if self._should_be_cached(is_cached):
+    cache_token = self._get_cache_token(state_key, is_cached)
+    if cache_token:
       cache_key = self._convert_to_cache_key(state_key)
-      self._state_cache.clear(cache_key, self._context.cache_token)
+      self._state_cache.clear(cache_key, cache_token)
     return self._underlying.clear(state_key)
 
   def done(self):
     # type: () -> None
     self._underlying.done()
 
-  def _materialize_iter(self,
-                        state_key,  # type: beam_fn_api_pb2.StateKey
-                        coder  # type: coder_impl.CoderImpl
-                       ):
+  def _lazy_iterator(
+      self,
+      state_key,  # type: beam_fn_api_pb2.StateKey
+      coder,  # type: coder_impl.CoderImpl
+      continuation_token=None  # type: Optional[bytes]
+    ):
     # type: (...) -> Iterator[Any]
     """Materializes the state lazily, one element at a time.
        :return A generator which returns the next element if advanced.
     """
-    continuation_token = None
     while True:
-      data, continuation_token = \
-          self._underlying.get_raw(state_key, continuation_token)
+      data, continuation_token = (
+          self._underlying.get_raw(state_key, continuation_token))
       input_stream = coder_impl.create_InputStream(data)
       while input_stream.size() > 0:
         yield coder.decode_from_stream(input_stream, True)
       if not continuation_token:
         break
 
-  def _should_be_cached(self, request_is_cached):
-    return (self._state_cache.is_cache_enabled() and
-            request_is_cached and
-            self._context.cache_token)
+  def _get_cache_token(self, state_key, request_is_cached):
+    if not self._state_cache.is_cache_enabled():
+      return None
+    elif state_key.HasField('bag_user_state'):
+      if request_is_cached and self._context.user_state_cache_token:
+        return self._context.user_state_cache_token
+      else:
+        return self._context.bundle_cache_token
+    elif state_key.WhichOneof('type').endswith('_side_input'):
+      side_input = getattr(state_key, state_key.WhichOneof('type'))
+      return self._context.side_input_cache_tokens.get(
+        (side_input.transform_id, side_input.side_input_id),
+        self._context.bundle_cache_token)
+
+  def _partially_cached_iterable(
+      self,
+      state_key,  # type: beam_fn_api_pb2.StateKey
+      coder  # type: coder_impl.CoderImpl
+    ):
+    # type: (...) -> Iterable[Any]
+    """Materialized the first page of data, concatinated with a lazy iterable
+    of the rest, if any.
+    """
+    data, continuation_token = (
+            self._underlying.get_raw(state_key, None))
+    head = []
+    input_stream = coder_impl.create_InputStream(data)
+    while input_stream.size() > 0:
+      head.append(coder.decode_from_stream(input_stream, True))
+
+    if continuation_token is None:
+      return head
+    else:
+      def iter_func():
+        for item in head:
+          yield item
+        for item in self._lazy_iterator(state_key, coder, continuation_token):
+          yield item
+      return _IterableFromIterator(iter_func)
 
   @staticmethod
   def _convert_to_cache_key(state_key):
     return state_key.SerializeToString()
 
 
+class _IterableFromIterator(object):
+  """Wraps an iterator as an iterable."""
+  def __init__(self, iter_func):
+    self._iter_func = iter_func
+  def __iter__(self):
+    return iter_func()
 
 Review comment:
   ```suggestion
       return iter_func
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 378512)
    Time Spent: 1h 50m  (was: 1h 40m)

> Implement state caching for side inputs
> ---------------------------------------
>
>                 Key: BEAM-8298
>                 URL: https://issues.apache.org/jira/browse/BEAM-8298
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core, sdk-py-harness
>            Reporter: Maximilian Michels
>            Assignee: Jing Chen
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Caching is currently only implemented for user state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to