This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 233ebd8e2bc [Dataflow Streaming] Add a job setting to limit value size 
in windmill state cache (#38458)
233ebd8e2bc is described below

commit 233ebd8e2bc3c7041a3325fd1a6018c88631a940
Author: Arun Pandian <[email protected]>
AuthorDate: Mon May 18 01:52:33 2026 -0700

    [Dataflow Streaming] Add a job setting to limit value size in windmill 
state cache (#38458)
    
    * Add key, value size distribution to state cache stats
    * Add max value size limit to status page
---
 .../options/DataflowStreamingPipelineOptions.java  |  10 ++
 .../dataflow/worker/StreamingDataflowWorker.java   |  17 ++-
 .../dataflow/worker/util/SimpleByteHistogram.java  |  43 ++++++++
 .../worker/windmill/state/WindmillStateCache.java  | 122 +++++++++++++++++----
 .../worker/util/SimpleByteHistogramTest.java       |  51 +++++++++
 .../windmill/state/WindmillStateCacheTest.java     |  62 +++++++++++
 .../worker/windmill/src/main/proto/windmill.proto  |   2 +
 7 files changed, 285 insertions(+), 22 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index 9727048e47a..90375ad445a 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -249,6 +249,16 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setIsWindmillServiceDirectPathEnabled(boolean 
isWindmillServiceDirectPathEnabled);
 
+  /**
+   * The maximum size of cached entries in bytes. Entries (eg: values, bags) 
larger than this limit
+   * will not be cached by the windmill state cache
+   */
+  @Description("The maximum size of cached entries in bytes.")
+  @Default.Long(Long.MAX_VALUE)
+  Long getMaxWindmillStateCacheEntryBytes();
+
+  void setMaxWindmillStateCacheEntryBytes(Long value);
+
   /**
    * Factory for creating local Windmill address. Reads from system propery 
'windmill.hostport' for
    * backwards compatibility.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index f5e5adab155..4d070da995b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -50,6 +50,7 @@ import 
org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
 import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore;
 import 
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig.Fetcher;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
@@ -113,6 +114,7 @@ import org.apache.beam.sdk.fn.JvmInitializers;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.util.construction.CoderTranslation;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials;
@@ -633,6 +635,10 @@ public final class StreamingDataflowWorker {
         WindmillStateCache.builder()
             .setSizeMb(options.getWorkerCacheMb())
             .setSupportMapViaMultimap(options.isEnableStreamingEngine())
+            
.setMaxCachedEntryBytes(options.getMaxWindmillStateCacheEntryBytes())
+            .setEnableHistogram(
+                !ExperimentalOptions.hasExperiment(
+                    options, "disable_windmill_user_state_cache_histogram"))
             .build();
 
     GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
@@ -651,6 +657,15 @@ public final class StreamingDataflowWorker {
                         windmillStateCache::forComputation,
                         ID_GENERATOR));
 
+    Fetcher configFetcher = 
configFetcherComputationStateCacheAndWindmillClient.configFetcher();
+    configFetcher
+        .getGlobalConfigHandle()
+        .registerConfigObserver(
+            config -> {
+              windmillStateCache.setMaxCachedEntryBytesOverride(
+                  config.userWorkerJobSettings().getMaxCachedEntryBytes());
+            });
+
     ComputationStateCache computationStateCache =
         
configFetcherComputationStateCacheAndWindmillClient.computationStateCache();
     WindmillServerStub windmillServer =
@@ -689,7 +704,7 @@ public final class StreamingDataflowWorker {
     return new StreamingDataflowWorker(
         windmillServer,
         clientId,
-        configFetcherComputationStateCacheAndWindmillClient.configFetcher(),
+        configFetcher,
         computationStateCache,
         windmillStateCache,
         workExecutor,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java
new file mode 100644
index 00000000000..6b90ca8df9e
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+/** A simple histogram to track byte sizes. */
+public class SimpleByteHistogram {
+  private final long[] buckets = new long[7];
+
+  public void add(long weight) {
+    buckets[getBucket(weight)]++;
+  }
+
+  private int getBucket(long weight) {
+    if (weight < 128) return 0;
+    if (weight < 256) return 1;
+    if (weight < 512) return 2;
+    if (weight < 1024) return 3;
+    if (weight < 10 * 1024) return 4;
+    if (weight < 1024 * 1024) return 5;
+    return 6;
+  }
+
+  public String format() {
+    return String.format(
+        "[<128B:%d, <256B:%d, <512B:%d, <1KB:%d, <10KB:%d, <1MB:%d, >=1MB:%d]",
+        buckets[0], buckets[1], buckets[2], buckets[3], buckets[4], 
buckets[5], buckets[6]);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
index 07c9599c866..7515db00085 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.dataflow.worker.*;
 import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet;
 import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
 import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
+import org.apache.beam.runners.dataflow.worker.util.SimpleByteHistogram;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.util.Weighted;
@@ -75,9 +76,18 @@ public class WindmillStateCache implements 
StatusDataProvider {
   private final ConcurrentMap<WindmillComputationKey, ForKey> keyIndex;
   private final long workerCacheBytes; // Copy workerCacheMb and convert to 
bytes.
   private final boolean supportMapViaMultimap;
-
-  WindmillStateCache(long sizeMb, boolean supportMapViaMultimap) {
+  private final long defaultMaxCachedEntryBytes;
+  private final boolean enableHistogram;
+  private volatile long maxCachedEntryBytesOverride = -1L;
+
+  WindmillStateCache(
+      long sizeMb,
+      boolean supportMapViaMultimap,
+      long maxCachedEntryBytes,
+      boolean enableHistogram) {
     this.workerCacheBytes = sizeMb * MEGABYTES;
+    this.defaultMaxCachedEntryBytes = maxCachedEntryBytes;
+    this.enableHistogram = enableHistogram;
     int stateCacheConcurrencyLevel =
         Math.max(STATE_CACHE_CONCURRENCY_LEVEL, 
Runtime.getRuntime().availableProcessors());
     this.stateCache =
@@ -99,11 +109,27 @@ public class WindmillStateCache implements 
StatusDataProvider {
 
     Builder setSupportMapViaMultimap(boolean supportMapViaMultimap);
 
+    Builder setMaxCachedEntryBytes(long maxCachedEntryBytes);
+
+    Builder setEnableHistogram(boolean enableHistogram);
+
     WindmillStateCache build();
   }
 
   public static Builder builder() {
-    return new 
AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false);
+    return new AutoBuilder_WindmillStateCache_Builder()
+        .setSupportMapViaMultimap(false)
+        .setMaxCachedEntryBytes(Long.MAX_VALUE)
+        .setEnableHistogram(true);
+  }
+
+  public void setMaxCachedEntryBytesOverride(long limit) {
+    this.maxCachedEntryBytesOverride = limit;
+  }
+
+  private long getMaxCachedEntryBytesLimit() {
+    long override = maxCachedEntryBytesOverride;
+    return override >= 0 ? override : defaultMaxCachedEntryBytes;
   }
 
   private EntryStats calculateEntryStats() {
@@ -111,10 +137,20 @@ public class WindmillStateCache implements 
StatusDataProvider {
     BiConsumer<StateId, StateCacheEntry> consumer =
         (stateId, stateCacheEntry) -> {
           stats.entries++;
-          stats.idWeight += stateId.getWeight();
-          stats.entryWeight += stateCacheEntry.getWeight();
+          long idWeight = stateId.getWeight();
+          stats.idWeight += idWeight;
+          long entryWeight = stateCacheEntry.getWeight();
+          stats.entryWeight += entryWeight;
           stats.entryValues += stateCacheEntry.values.size();
           stats.maxEntryValues = Math.max(stats.maxEntryValues, 
stateCacheEntry.values.size());
+          if (enableHistogram) {
+            stats.addKeyWeight(idWeight);
+            stats.addEntryWeight(entryWeight);
+            stateCacheEntry.values.forEach(
+                (encodedAddress, weightedValue) -> {
+                  stats.addValueWeight(weightedValue.weight);
+                });
+          }
         };
     stateCache.asMap().forEach(consumer);
     return stats;
@@ -142,23 +178,44 @@ public class WindmillStateCache implements 
StatusDataProvider {
   @Override
   public void appendSummaryHtml(PrintWriter response) {
     response.println("Cache Stats: <br><table>");
-    response.println(
-        "<tr><th>Hit Ratio</th><th>Evictions</th><th>Entries</th>"
-            + "<th>Entry Values</th><th>Max Entry Values</th>"
-            + "<th>Id Weight</th><th>Entry Weight</th><th>Max 
Weight</th><th>Keys</th>"
-            + "</tr><tr>");
     CacheStats cacheStats = stateCache.stats();
     EntryStats entryStats = calculateEntryStats();
-    response.println("<td>" + cacheStats.hitRate() + "</td>");
-    response.println("<td>" + cacheStats.evictionCount() + "</td>");
-    response.println("<td>" + entryStats.entries + "(" + stateCache.size() + " 
inc. weak) </td>");
-    response.println("<td>" + entryStats.entryValues + "</td>");
-    response.println("<td>" + entryStats.maxEntryValues + "</td>");
-    response.println("<td>" + entryStats.idWeight / MEGABYTES + "MB</td>");
-    response.println("<td>" + entryStats.entryWeight / MEGABYTES + "MB</td>");
-    response.println("<td>" + getMaxWeight() / MEGABYTES + "MB</td>");
-    response.println("<td>" + keyIndex.size() + "</td>");
-    response.println("</tr></table><br>");
+
+    response.println("<tr><th>Hit Ratio</th><td>" + cacheStats.hitRate() + 
"</td></tr>");
+    response.println("<tr><th>Evictions</th><td>" + cacheStats.evictionCount() 
+ "</td></tr>");
+    response.println(
+        "<tr><th>Entries</th><td>"
+            + entryStats.entries
+            + " ("
+            + stateCache.size()
+            + " inc. weak)</td></tr>");
+    response.println("<tr><th>Entry Values</th><td>" + entryStats.entryValues 
+ "</td></tr>");
+    response.println(
+        "<tr><th>Max Entry Values</th><td>" + entryStats.maxEntryValues + 
"</td></tr>");
+    response.println(
+        "<tr><th>Id Weight</th><td>" + entryStats.idWeight / MEGABYTES + 
"MB</td></tr>");
+    response.println(
+        "<tr><th>Entry Weight</th><td>" + entryStats.entryWeight / MEGABYTES + 
"MB</td></tr>");
+    response.println("<tr><th>Max Weight</th><td>" + getMaxWeight() / 
MEGABYTES + "MB</td></tr>");
+    response.println("<tr><th>Keys</th><td>" + keyIndex.size() + "</td></tr>");
+    response.println(
+        "<tr><th>Entry Size Limit</th><td>" + getMaxCachedEntryBytesLimit() + 
" bytes</td></tr>");
+    if (enableHistogram) {
+      response.println(
+          "<tr><th>Entry Weight Dist</th><td>"
+              + entryStats.entryWeightHistogram.format()
+              + "</td></tr>");
+      response.println(
+          "<tr><th>Value Weight Dist</th><td>"
+              + entryStats.valueWeightHistogram.format()
+              + "</td></tr>");
+      response.println(
+          "<tr><th>Key Weight Dist</th><td>"
+              + entryStats.keyWeightHistogram.format()
+              + "</td></tr>");
+    }
+
+    response.println("</table><br>");
   }
 
   public BaseStatusServlet statusServlet() {
@@ -180,6 +237,21 @@ public class WindmillStateCache implements 
StatusDataProvider {
     long entryWeight;
     long entryValues;
     long maxEntryValues;
+    SimpleByteHistogram entryWeightHistogram = new SimpleByteHistogram();
+    SimpleByteHistogram valueWeightHistogram = new SimpleByteHistogram();
+    SimpleByteHistogram keyWeightHistogram = new SimpleByteHistogram();
+
+    void addEntryWeight(long weight) {
+      entryWeightHistogram.add(weight);
+    }
+
+    void addValueWeight(long weight) {
+      valueWeightHistogram.add(weight);
+    }
+
+    void addKeyWeight(long weight) {
+      keyWeightHistogram.add(weight);
+    }
   }
 
   /**
@@ -413,7 +485,15 @@ public class WindmillStateCache implements 
StatusDataProvider {
     }
 
     public void persist() {
-      localCache.forEach(stateCache::put);
+      long limit = WindmillStateCache.this.getMaxCachedEntryBytesLimit();
+      localCache.forEach(
+          (id, entry) -> {
+            if (entry.getWeight() <= limit) {
+              stateCache.put(id, entry);
+            } else {
+              stateCache.invalidate(id);
+            }
+          });
     }
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java
new file mode 100644
index 00000000000..252300a1955
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SimpleByteHistogram}. */
+@RunWith(JUnit4.class)
+public class SimpleByteHistogramTest {
+
+  @Test
+  public void testHistogram() {
+    SimpleByteHistogram histogram = new SimpleByteHistogram();
+    histogram.add(10); // <128B
+    histogram.add(127); // <128B
+    histogram.add(128); // <256B
+    histogram.add(255); // <256B
+    histogram.add(256); // <512B
+    histogram.add(511); // <512B
+    histogram.add(512); // <1KB
+    histogram.add(1023); // <1KB
+    histogram.add(1024); // <10KB
+    histogram.add(10240 - 1); // <10KB
+    histogram.add(10240); // <1MB
+    histogram.add(1048576 - 1); // <1MB
+    histogram.add(1048576); // >=1MB
+    histogram.add(2000000); // >=1MB
+
+    String expected = "[<128B:2, <256B:2, <512B:2, <1KB:2, <10KB:2, <1MB:2, 
>=1MB:2]";
+    assertEquals(expected, histogram.format());
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
index bbb8e4c93c0..2d3d9b5ccff 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
@@ -270,6 +270,68 @@ public class WindmillStateCacheTest {
     assertEquals(400 * MEGABYTES, cache.getMaxWeight());
   }
 
+  @Test
+  public void testMaxCachedEntryBytes() throws Exception {
+    cache.setMaxCachedEntryBytesOverride(
+        100); // Set limit to 100 bytes, per cache entry overhead is 136.
+
+    WindmillStateCache.ForKeyAndFamily keyCache =
+        cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 
1L).forFamily(STATE_FAMILY);
+
+    TestStateTag tag1 = new TestStateTag("tag1");
+    TestStateTag tag2 = new TestStateTag("tag2");
+
+    putInCache(keyCache, StateNamespaces.global(), tag1, new TestState("g1"), 
10);
+    keyCache.persist();
+
+    // It should not be in global cache because it's too large.
+    keyCache =
+        cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 
2L).forFamily(STATE_FAMILY);
+    assertEquals(Optional.empty(), getFromCache(keyCache, 
StateNamespaces.global(), tag1));
+
+    // Now set limit larger.
+    cache.setMaxCachedEntryBytesOverride(1000);
+
+    putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2"), 
10);
+    keyCache.persist();
+
+    // It should be in global cache.
+    keyCache =
+        cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 
3L).forFamily(STATE_FAMILY);
+    assertEquals(
+        Optional.of(new TestState("g2")), getFromCache(keyCache, 
StateNamespaces.global(), tag2));
+
+    // Now update it to be larger than limit.
+    putInCache(keyCache, StateNamespaces.global(), tag2, new 
TestState("g2_large"), 2000);
+    keyCache.persist();
+
+    // It should be removed from global cache.
+    keyCache =
+        cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 
4L).forFamily(STATE_FAMILY);
+    assertEquals(Optional.empty(), getFromCache(keyCache, 
StateNamespaces.global(), tag2));
+  }
+
+  @Test
+  public void testDisableHistogram() throws Exception {
+    WindmillStateCache noHistogramCache =
+        
WindmillStateCache.builder().setSizeMb(400).setEnableHistogram(false).build();
+    WindmillStateCache.ForKeyAndFamily keyCache =
+        noHistogramCache
+            .forComputation(COMPUTATION)
+            .forKey(COMPUTATION_KEY, 0L, 1L)
+            .forFamily(STATE_FAMILY);
+
+    putInCache(
+        keyCache, StateNamespaces.global(), new TestStateTag("tag1"), new 
TestState("g1"), 2);
+    keyCache.persist();
+
+    java.io.StringWriter writer = new java.io.StringWriter();
+    noHistogramCache.appendSummaryHtml(new java.io.PrintWriter(writer));
+    String summary = writer.toString();
+
+    org.junit.Assert.assertFalse(summary.contains("Entry Weight Dist"));
+  }
+
   /** Verifies that values are cached in the appropriate namespaces. */
   @Test
   public void testInvalidation() throws Exception {
diff --git 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index b7579cbacb8..58e4f7df3c3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -986,6 +986,8 @@ message UserWorkerRunnerV1Settings {
   optional ConnectivityType connectivity_type = 4
     [default = CONNECTIVITY_TYPE_DEFAULT];
 
+  optional int64 max_cached_entry_bytes = 5 [default = -1];
+
   reserved 1, 2;
 }
 

Reply via email to