seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1038557202


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -377,25 +392,21 @@ public Map<String, String> createConfig(
     if (signature.usesState()) {
       // set up user state configs
       for (DoFnSignature.StateDeclaration state : 
signature.stateDeclarations().values()) {
-        final String storeId = state.id();
-
-        // TODO: remove validation after we support same state id in different 
ParDo.
-        if (!ctx.addStateId(storeId)) {
-          throw new IllegalStateException(
-              "Duplicate StateId " + storeId + " found in multiple ParDo.");
-        }
-
+        final String userStateId = state.id();
+        final String escapedParDoName =
+            
SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName());
+        final String uniqueStoreId = ctx.getUniqueStoreId(userStateId, 
escapedParDoName);

Review Comment:
   Now that we removed the rewriting config logic, we no longer have 
"userStateIdToStoreIdMap". The change is that we have the nonUniqueStateId set 
from StateIdParser.scan(). Then, given this set, we'll create the 
stateIdToStoreMapping for each ParDo and initialize the 
SamzaStateStoreInternals, while keeping the stateId-> store map in "stores". 
Please take a look and let me know if this looks reasonable. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to