gemini-code-assist[bot] commented on code in PR #38458:
URL: https://github.com/apache/beam/pull/38458#discussion_r3224857921


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -651,6 +653,17 @@ public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions o
                         windmillStateCache::forComputation,
                         ID_GENERATOR));
 
+    Fetcher configedFetcher = 
configFetcherComputationStateCacheAndWindmillClient.configFetcher();
+    configedFetcher

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The variable name `configedFetcher` contains a typo. "Configed" is not a 
standard English word; `fetcher` or `configuredFetcher` would be more 
appropriate.
   
   ```suggestion
       Fetcher fetcher = 
configFetcherComputationStateCacheAndWindmillClient.configFetcher();
       fetcher
   ```



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -249,6 +249,16 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setIsWindmillServiceDirectPathEnabled(boolean 
isWindmillServiceDirectPathEnabled);
 
+  /**
+   * The maximum size of cached values in bytes. Values larger than this limit 
will not be cached by
+   * the windmill state cache
+   */
+  @Description("The maximum size of cached values in bytes.")

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The description "maximum size of cached values" is slightly ambiguous 
because the implementation checks against the entry's weight, which includes 
cache overhead (e.g., approximately 136 bytes per entry). It would be clearer 
to specify that this limit includes overhead so users can set appropriate 
values.
   
   ```suggestion
     /**
      * The maximum size of cached values in bytes (including cache overhead). 
Values larger than this
      * limit will not be cached by the windmill state cache.
      */
     @Description("The maximum size of cached values in bytes (including cache 
overhead).")
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -689,7 +702,7 @@ public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions o
     return new StreamingDataflowWorker(
         windmillServer,
         clientId,
-        configFetcherComputationStateCacheAndWindmillClient.configFetcher(),
+        configedFetcher,

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Updating the variable name to match the suggested change for the fetcher 
definition.
   
   ```suggestion
           fetcher,
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -651,6 +653,17 @@ public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions o
                         windmillStateCache::forComputation,
                         ID_GENERATOR));
 
+    Fetcher configedFetcher = 
configFetcherComputationStateCacheAndWindmillClient.configFetcher();
+    configedFetcher
+        .getGlobalConfigHandle()
+        .registerConfigObserver(
+            config -> {
+              if (config.userWorkerJobSettings().hasMaxCachedValueBytes()) {
+                windmillStateCache.setMaxCachedValueBytesOverride(
+                    config.userWorkerJobSettings().getMaxCachedValueBytes());
+              }
+            });

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The configuration observer only updates the override when the field is 
present in the dynamic configuration. If the setting is subsequently removed 
from the dynamic configuration, the worker will continue to use the last 
received override instead of reverting to the default value from the pipeline 
options. Adding an `else` block to reset the override to `-1L` ensures the 
worker correctly handles the removal of the dynamic setting.
   
   ```suggestion
               config -> {
                 if (config.userWorkerJobSettings().hasMaxCachedValueBytes()) {
                   windmillStateCache.setMaxCachedValueBytesOverride(
                       config.userWorkerJobSettings().getMaxCachedValueBytes());
                 } else {
                   windmillStateCache.setMaxCachedValueBytesOverride(-1L);
                 }
               });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to