gemini-code-assist[bot] commented on code in PR #36313:
URL: https://github.com/apache/beam/pull/36313#discussion_r2450556684


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -420,11 +393,16 @@ public <T extends State> Optional<T> get(StateNamespace 
namespace, StateTag<T> a
               localCache.computeIfAbsent(
                   new StateId(forKey, stateFamily, namespace), 
stateCache::getIfPresent));
 
-      return stateCacheEntry.flatMap(entry -> entry.get(namespace, address));
+      return stateCacheEntry.flatMap(entry -> entry.get(encodedAddress));
+    }
+
+    public <T extends State> void put(
+        StateNamespace namespace, StateTag<?> tag, T value, long weight) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   For better type safety, consider tightening the type of the `tag` parameter 
from `StateTag<?>` to `StateTag<? extends T>`. This would ensure at compile 
time that the provided state tag is compatible with the state value `T`.
   
   ```suggestion
           StateNamespace namespace, StateTag<? extends T> tag, T value, long 
weight) {
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java:
##########
@@ -61,19 +62,14 @@ class WindmillCombiningState<InputT, AccumT, OutputT> 
extends WindmillState
       boolean isNewKey,
       WindmillStateTagUtil windmillStateTagUtil) {
     StateTag<BagState<AccumT>> internalBagAddress = 
StateTags.convertToBagTagInternal(address);
+    InternedByteString encodeKey = windmillStateTagUtil.encodeKey(namespace, 
internalBagAddress);
+
     this.bag =
         cache
-            .get(namespace, internalBagAddress)
+            .get(namespace, encodeKey)
             .map(state -> (WindmillBag<AccumT>) state)
             .orElseGet(
-                () ->
-                    new WindmillBag<>(
-                        namespace,
-                        internalBagAddress,
-                        stateFamily,
-                        accumCoder,
-                        isNewKey,
-                        windmillStateTagUtil));
+                () -> new WindmillBag<>(namespace, encodeKey, stateFamily, 
accumCoder, isNewKey));

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The refactoring to pre-encode the state key in `CachingStateTable` seems 
incomplete for `CombiningState`. This constructor still accepts 
`WindmillStateTagUtil` and performs key encoding internally, which is 
inconsistent with the other state types in this PR.
   
   For consistency, consider the following:
   1.  Modify this constructor to accept the pre-encoded `InternedByteString` 
key.
   2.  Remove the `WindmillStateTagUtil` parameter.
   3.  Update `CachingStateTable.bindCombiningValue` to perform the key 
encoding and pass the `InternedByteString` to this constructor, similar to how 
other `bind*` methods have been updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to