gemini-code-assist[bot] commented on code in PR #38458:
URL: https://github.com/apache/beam/pull/38458#discussion_r3225082795


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -651,6 +657,15 @@ public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions o
                         windmillStateCache::forComputation,
                         ID_GENERATOR));
 
+    Fetcher configFetcher = 
configFetcherComputationStateCacheAndWindmillClient.configFetcher();
+    configFetcher
+        .getGlobalConfigHandle()
+        .registerConfigObserver(
+            config -> {
+              windmillStateCache.setMaxCachedValueBytesOverride(
+                  config.userWorkerJobSettings().getMaxCachedValueBytes());
+            });

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `config.userWorkerJobSettings()` call might return `null` if the 
settings are not present in the configuration response. Accessing 
`getMaxCachedValueBytes()` on a null object will cause a 
`NullPointerException`, which could crash the configuration observer thread. It 
is safer to check for null before accessing the settings.
   
   ```suggestion
               config -> {
                 if (config.userWorkerJobSettings() != null) {
                   windmillStateCache.setMaxCachedValueBytesOverride(
                       config.userWorkerJobSettings().getMaxCachedValueBytes());
                 }
               });
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -99,22 +108,48 @@ public interface Builder {
 
     Builder setSupportMapViaMultimap(boolean supportMapViaMultimap);
 
+    Builder setMaxCachedValueBytes(long maxCachedValueBytes);
+
+    Builder setEnableHistogram(boolean enableHistogram);
+
     WindmillStateCache build();
   }
 
   public static Builder builder() {
-    return new 
AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false);
+    return new AutoBuilder_WindmillStateCache_Builder()
+        .setSupportMapViaMultimap(false)
+        .setMaxCachedValueBytes(Long.MAX_VALUE)
+        .setEnableHistogram(true);
+  }
+
+  public void setMaxCachedValueBytesOverride(long limit) {
+    this.maxCachedValueBytesOverride = limit;
+  }
+
+  private long getMaxCachedValueBytesLimit() {
+    long override = maxCachedValueBytesOverride;
+    return override >= 0 ? override : defaultMaxCachedValueBytes;
   }
 
   private EntryStats calculateEntryStats() {
     EntryStats stats = new EntryStats();
     BiConsumer<StateId, StateCacheEntry> consumer =
         (stateId, stateCacheEntry) -> {
           stats.entries++;
-          stats.idWeight += stateId.getWeight();
-          stats.entryWeight += stateCacheEntry.getWeight();
+          long idWeight = stateId.getWeight();
+          stats.idWeight += idWeight;
+          long entryWeight = stateCacheEntry.getWeight();
+          stats.entryWeight += entryWeight;
           stats.entryValues += stateCacheEntry.values.size();
           stats.maxEntryValues = Math.max(stats.maxEntryValues, 
stateCacheEntry.values.size());
+          if (enableHistogram) {
+            stats.addKeyWeight(idWeight);
+            stats.addEntryWeight(entryWeight);
+            stateCacheEntry.values.forEach(
+                (encodedAddress, weightedValue) -> {
+                  stats.addValueWeight(weightedValue.weight);
+                });

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Iterating over all values in every cache entry to calculate the weight 
distribution increases the complexity of `calculateEntryStats` from 
$O(\text{entries})$ to $O(\text{total values})$. For large caches with many 
values per entry (e.g., large `BagState` or `MapState`), this could cause 
noticeable delays when accessing the worker's status page. Since this is for a 
debug page, it might be acceptable, but consider if the performance impact has 
been evaluated for very large states.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -413,7 +506,15 @@ public <T extends State> void put(
     }
 
     public void persist() {
-      localCache.forEach(stateCache::put);
+      long limit = WindmillStateCache.this.getMaxCachedValueBytesLimit();
+      localCache.forEach(
+          (id, entry) -> {
+            if (entry.getWeight() <= limit) {
+              stateCache.put(id, entry);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The limit is compared against `entry.getWeight()`, which includes the 
overhead of the `StateCacheEntry` and `StateId` (approximately 136 bytes). This 
means that if a user sets a small limit (e.g., 100 bytes), no values will ever 
be cached. The documentation in `DataflowStreamingPipelineOptions` says 
"maximum size of cached values", which might lead users to believe it only 
applies to the payload. Consider clarifying the documentation or adjusting the 
logic to only account for the value size if that was the intent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to