scwhittle commented on code in PR #38458:
URL: https://github.com/apache/beam/pull/38458#discussion_r3233345938


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -249,6 +249,16 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setIsWindmillServiceDirectPathEnabled(boolean 
isWindmillServiceDirectPathEnabled);
 
+  /**
+   * The maximum size of cached values in bytes. Values larger than this limit 
will not be cached by

Review Comment:
   values is a little confusing with ValueState. I believe this applies to 
other state types.  Would it be better to have separate values for different 
state types instead? We could for example choose to cache large value states 
but not large bags.
   
   I think side inputs is a separate cache but if not we should likely exclude 
those from this limit.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -99,22 +108,48 @@ public interface Builder {
 
     Builder setSupportMapViaMultimap(boolean supportMapViaMultimap);
 
+    Builder setMaxCachedValueBytes(long maxCachedValueBytes);
+
+    Builder setEnableHistogram(boolean enableHistogram);
+
     WindmillStateCache build();
   }
 
   public static Builder builder() {
-    return new 
AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false);
+    return new AutoBuilder_WindmillStateCache_Builder()
+        .setSupportMapViaMultimap(false)
+        .setMaxCachedValueBytes(Long.MAX_VALUE)
+        .setEnableHistogram(true);
+  }
+
+  public void setMaxCachedValueBytesOverride(long limit) {
+    this.maxCachedValueBytesOverride = limit;
+  }
+
+  private long getMaxCachedValueBytesLimit() {
+    long override = maxCachedValueBytesOverride;
+    return override >= 0 ? override : defaultMaxCachedValueBytes;
   }
 
   private EntryStats calculateEntryStats() {
     EntryStats stats = new EntryStats();
     BiConsumer<StateId, StateCacheEntry> consumer =
         (stateId, stateCacheEntry) -> {
           stats.entries++;
-          stats.idWeight += stateId.getWeight();
-          stats.entryWeight += stateCacheEntry.getWeight();
+          long idWeight = stateId.getWeight();
+          stats.idWeight += idWeight;
+          long entryWeight = stateCacheEntry.getWeight();
+          stats.entryWeight += entryWeight;
           stats.entryValues += stateCacheEntry.values.size();
           stats.maxEntryValues = Math.max(stats.maxEntryValues, 
stateCacheEntry.values.size());
+          if (enableHistogram) {
+            stats.addKeyWeight(idWeight);
+            stats.addEntryWeight(entryWeight);
+            stateCacheEntry.values.forEach(
+                (encodedAddress, weightedValue) -> {

Review Comment:
   This is on the order of tags within a key, correct? It seems like that there 
will more likely be more stateCacheEntry to iterate over than 
stateCacheEntry.values so I don't think it is worth precalculating since we 
arlready iterate over stateCacheEntrys



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -180,6 +248,31 @@ private static class EntryStats {
     long entryWeight;
     long entryValues;
     long maxEntryValues;
+    long[] entryWeightHistogram = new long[7];
+    long[] valueWeightHistogram = new long[7];
+    long[] keyWeightHistogram = new long[7];
+
+    void addEntryWeight(long weight) {
+      entryWeightHistogram[getBucket(weight)]++;
+    }
+
+    void addValueWeight(long weight) {
+      valueWeightHistogram[getBucket(weight)]++;
+    }
+
+    void addKeyWeight(long weight) {
+      keyWeightHistogram[getBucket(weight)]++;
+    }
+
+    private int getBucket(long weight) {

Review Comment:
   create a SimpleByteHistogram class? The buckets here and the status 
rendering woudl be better grouped in a class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to