lukecwik commented on a change in pull request #12062:
URL: https://github.com/apache/beam/pull/12062#discussion_r445074679



##########
File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/InMemoryBagUserStateFactory.java
##########
@@ -37,13 +37,19 @@
 /**
  * Holds user state in memory. Only one key is active at a time due to the 
GroupReduceFunction being
  * called once per key. Needs to be reset via {@code resetForNewKey()} before 
processing a new key.
+ *
+ * <p>In case of any failures, this factory must be discarded. Otherwise, the 
contained state cache
+ * token would be reused which would corrupt the state cache.
  */
 public class InMemoryBagUserStateFactory<K, V, W extends BoundedWindow>
     implements StateRequestHandlers.BagUserStateHandlerFactory<K, V, W> {
 
+  private final ByteString cacheToken;
+
   private List<InMemorySingleKeyBagState> handlers;
 
   public InMemoryBagUserStateFactory() {
+    cacheToken = 
ByteString.copyFrom(UUID.randomUUID().toString().getBytes(Charsets.UTF_8));

Review comment:
       I think we'll want to move the cache token generation up higher to the 
`ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter` or the 
factory has to own the cache token or we add a method which is a supplier to 
the StateRequestHandlers#forBagUserState. The comment added for this class 
should be moved appropriately.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to