[ 
https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306402
 ]

ASF GitHub Bot logged work on BEAM-5428:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Sep/19 16:07
            Start Date: 04/Sep/19 16:07
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #9418: [BEAM-5428] 
Implement cross-bundle user state caching in the Python SDK
URL: https://github.com/apache/beam/pull/9418#discussion_r320846304
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/statecache.py
 ##########
 @@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A module for caching state reads/writes in Beam applications."""
+from __future__ import absolute_import
+
+import collections
+from threading import Lock
+
+
+class StateCache(object):
+  """ Cache for Beam state access, scoped by state key and cache_token.
+
+  For a given state_key, caches a (cache_token, value) tuple and allows to
+    a) read from the cache,
+           if the currently stored cache_token matches the provided
+    a) write to the cache,
+           storing the new value alongside with a cache token
+    c) append to the cache cache,
+           if the currently stored cache_token matches the provided
+
+  The operations on the cache are thread-safe for use by multiple workers.
+  """
+
+  def __init__(self, max_entries):
+    self._cache = self.LRUCache(max_entries, (None, None))
+    self._lock = Lock()
+
+  def get(self, state_key, cache_token):
+    assert cache_token and self.is_cache_enabled()
+    with self._lock:
+      token, value = self._cache.get(state_key)
+    return value if token == cache_token else None
+
+  def put(self, state_key, cache_token, value):
+    assert cache_token and self.is_cache_enabled()
+    with self._lock:
+      return self._cache.put(state_key, (cache_token, value))
+
+  def append(self, state_key, cache_token, elements):
+    assert cache_token and self.is_cache_enabled()
+    with self._lock:
+      token, value = self._cache.get(state_key)
+      if token != cache_token:
+        # Discard cached state if tokens do not match
+        value = []
 
 Review comment:
   Good catch. Have change it to expunge the cache in this case.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 306402)
    Time Spent: 15h 20m  (was: 15h 10m)

> Implement cross-bundle state caching.
> -------------------------------------
>
>                 Key: BEAM-5428
>                 URL: https://issues.apache.org/jira/browse/BEAM-5428
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-harness
>            Reporter: Robert Bradshaw
>            Assignee: Rakesh Kumar
>            Priority: Major
>          Time Spent: 15h 20m
>  Remaining Estimate: 0h
>
> Tech spec: 
> [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m]
> Relevant document: 
> [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit]
> Mailing list link: 
> [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to